Skip to content

Commit

Permalink
Merge pull request #20 from Webiks/RF_parallel
Browse files Browse the repository at this point in the history
Rf parallel
  • Loading branch information
shacharmo authored May 19, 2021
2 parents 0417e48 + 9525e37 commit f29afc4
Show file tree
Hide file tree
Showing 9 changed files with 395 additions and 229 deletions.
43 changes: 26 additions & 17 deletions IgOmeProfiling_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,16 @@
sys.path.insert(0, src_dir)

from auxiliaries.pipeline_auxiliaries import *


def run_pipeline(fastq_path, barcode2samplename_path, samplename2biologicalcondition_path, analysis_dir, logs_dir,
left_construct, right_construct, max_mismatches_allowed, min_sequencing_quality, minimal_length_required, gz, rpm,
max_msas_per_sample, max_msas_per_bc,
max_number_of_cluster_members_per_sample, max_number_of_cluster_members_per_bc,
max_msas_per_sample, max_msas_per_bc, max_number_of_cluster_members_per_sample, max_number_of_cluster_members_per_bc,
allowed_gap_frequency, threshold, word_length, discard, concurrent_cutoffs, meme_split_size, use_mapitope, aln_cutoff,
pcc_cutoff, skip_sample_merge_meme, minimal_number_of_columns_required_create_meme, prefix_length_in_clstr, number_of_random_pssms,
pcc_cutoff, skip_sample_merge_meme, minimal_number_of_columns_required_create_meme, prefix_length_in_clstr,
stop_before_random_forest, number_of_random_pssms, number_parallel_random_forest, min_value_error_random_forest,
rank_method, tfidf_method, tfidf_factor, shuffles, shuffles_percent, shuffles_digits,
num_of_random_configurations_to_sample, cv_num_of_splits, seed_random_forest,
num_of_random_configurations_to_sample, cv_num_of_splits, seed_random_forest, random_forest_seed_configurations,
run_summary_path, error_path, queue, verbose, argv):

os.makedirs(os.path.split(run_summary_path)[0], exist_ok=True)
Expand Down Expand Up @@ -88,13 +89,17 @@ def run_pipeline(fastq_path, barcode2samplename_path, samplename2biologicalcondi
os.makedirs(third_phase_output_path, exist_ok=True)
third_phase_logs_path = os.path.join(logs_dir, 'model_fitting')
os.makedirs(third_phase_logs_path, exist_ok=True)

module_parameters = [first_phase_output_path, second_phase_output_path, third_phase_output_path,
third_phase_logs_path, samplename2biologicalcondition_path, number_of_random_pssms,
third_phase_done_path, f'--shuffles_percent {shuffles_percent}', f'--shuffles_digits {shuffles_digits}',
f'--num_of_random_configurations_to_sample {num_of_random_configurations_to_sample}', f'--cv_num_of_splits {cv_num_of_splits}', f'--seed_random_forest {seed_random_forest}',
f'--rank_method {rank_method}', f'--error_path {error_path}',
'-v' if verbose else '', f'-q {queue}','-m' if use_mapitope else '']
third_phase_done_path, '--stop_before_random_forest' if stop_before_random_forest else '',
f'--num_of_random_configurations_to_sample {num_of_random_configurations_to_sample}',
f'--number_parallel_random_forest' {number_parallel_random_forest}, f'--min_value_error_random_forest' {min_value_error_random_forest},
f'--shuffles_percent {shuffles_percent}', f'--shuffles_digits {shuffles_digits}',
f'--cv_num_of_splits {cv_num_of_splits}', f'--seed_random_forest {seed_random_forest}',
f'--random_forest_seed_configurations {random_forest_seed_configurations}',f'--error_path {error_path}', '-v' if verbose else '',
f'-q {queue}','-m' if use_mapitope else '']

if rank_method == 'tfidf':
if tfidf_method:
module_parameters += ['--tfidf_method', tfidf_method]
Expand Down Expand Up @@ -185,7 +190,10 @@ def run_pipeline(fastq_path, barcode2samplename_path, samplename2biologicalcondi
help='How long should be the prefix that is taken from the clstr file (cd-hit max prefix is 20)')

# optional parameters for the modelling step
parser.add_argument('--stop_before_random_forest', action='store_true', help='A boolean flag for mark if we need to run the random forest')
parser.add_argument('--number_of_random_pssms', default=100, type=int, help='Number of pssm permutations')
parser.add_argument('--number_parallel_random_forest', default=20, type=int, help='How many random forest configurations to run in parallel')
parser.add_argument('--min_value_error_random_forest', default=0, type=float, help='A random forest model error value for convergence allowing to stop early')
parser.add_argument('--rank_method', choices=['pval', 'tfidf', 'shuffles'], default='pval', help='Motifs ranking method')
parser.add_argument('--tfidf_method', choices=['boolean', 'terms', 'log', 'augmented'], default='boolean', help='TF-IDF method')
parser.add_argument('--tfidf_factor', type=float, default=0.5, help='TF-IDF augmented method factor (0-1)')
Expand All @@ -194,8 +202,8 @@ def run_pipeline(fastq_path, barcode2samplename_path, samplename2biologicalcondi
parser.add_argument('--shuffles_digits', default=2, type=int, help='Number of digits after the point to print in scanning files.')
parser.add_argument('--num_of_random_configurations_to_sample', default=100, type=int, help='How many random configurations of hyperparameters should be sampled?')
parser.add_argument('--cv_num_of_splits', default=2, help='How folds should be in the cross validation process? (use 0 for leave one out)')
parser.add_argument('--seed_random_forest', default=42, help='Seed number for reconstructing experiments')

parser.add_argument('--seed_random_forest', default=42, help='Seed number for reconstructing experiments')
parser.add_argument('--random_forest_seed_configurations', default=123 , type=int, help='Random seed value for generating random forest configurations')
# general optional parameters
parser.add_argument('--run_summary_path', type=str,
help='a file in which the running configuration and timing will be written to')
Expand All @@ -219,11 +227,12 @@ def run_pipeline(fastq_path, barcode2samplename_path, samplename2biologicalcondi

run_pipeline(args.fastq_path, args.barcode2samplename_path, args.samplename2biologicalcondition_path,
args.analysis_dir.rstrip('/'), args.logs_dir.rstrip('/'),
args.left_construct, args.right_construct, args.max_mismatches_allowed, args.min_sequencing_quality, args.minimal_length_required, True if args.gz else False, True if args.rpm else False,
args.max_msas_per_sample, args.max_msas_per_bc,
args.max_number_of_cluster_members_per_sample, args.max_number_of_cluster_members_per_bc,
args.left_construct, args.right_construct, args.max_mismatches_allowed, args.min_sequencing_quality, args.minimal_length_required, args.gz, args.rpm,
args.max_msas_per_sample, args.max_msas_per_bc, args.max_number_of_cluster_members_per_sample, args.max_number_of_cluster_members_per_bc,
args.allowed_gap_frequency, args.threshold, args.word_length, args.discard, concurrent_cutoffs, args.meme_split_size,
args.mapitope, args.aln_cutoff, args.pcc_cutoff, args.skip_sample_merge_meme, args.minimal_number_of_columns_required_create_meme, args.prefix_length_in_clstr,
args.number_of_random_pssms, args.rank_method, args.tfidf_method, args.tfidf_factor, args.shuffles, args.shuffles_percent, args.shuffles_digits,
args.num_of_random_configurations_to_sample, args.cv_num_of_splits, args.seed_random_forest,
run_summary_path, error_path, args.queue, True if args.verbose else False, sys.argv)
args.stop_before_random_forest, args.number_of_random_pssms, args.number_parallel_random_forest, args.min_value_error_random_forest,
args.rank_method, args.tfidf_method, args.tfidf_factor, args.shuffles, args.shuffles_percent, args.shuffles_digits,
args.num_of_random_configurations_to_sample, args.cv_num_of_splits, args.seed_random_forest, args.random_forest_seed_configurations,
run_summary_path, error_path, args.queue, args.verbose, sys.argv)

117 changes: 64 additions & 53 deletions model_fitting/module_wraper.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ def repeat_items(list):
output.append(x)
return output


def build_classifier(first_phase_output_path, motif_inference_output_path,
classification_output_path, logs_dir, samplename2biologicalcondition_path,
fitting_done_path, number_of_random_pssms, rank_method, tfidf_method, tfidf_factor,
shuffles, shuffles_percent, shuffles_digits, num_of_random_configurations_to_sample, cv_num_of_splits, seed_random_forest,
queue_name, verbose, error_path, use_mapitope, argv):
classification_output_path, logs_dir, samplename2biologicalcondition_path, number_of_random_pssms,
fitting_done_path, stop_before_random_forest, num_of_random_configurations_to_sample,
number_parallel_random_forest, min_value_error_random_forest, rank_method, tfidf_method, tfidf_factor,
shuffles, shuffles_percent, shuffles_digits, cv_num_of_splits, random_forest_seed, random_forest_seed_configurations,
queue_name, verbose, error_path, use_mapitop, argv):

is_pval = rank_method == 'pval'
os.makedirs(classification_output_path, exist_ok=True)
os.makedirs(logs_dir, exist_ok=True)
Expand All @@ -43,7 +44,6 @@ def build_classifier(first_phase_output_path, motif_inference_output_path,
scanning_dir_path = os.path.join(bc_dir_path, 'scanning')
os.makedirs(scanning_dir_path, exist_ok=True)


# compute scanning scores (hits and values)
logger.info('_'*100)
logger.info(f'{datetime.datetime.now()}: scanning peptides vs motifs (hits and values)')
Expand Down Expand Up @@ -146,47 +146,53 @@ def build_classifier(first_phase_output_path, motif_inference_output_path,


# fitting a random forest model (hits and values)
logger.info('_'*100)
logger.info(f'{datetime.datetime.now()}: fitting model')
script_name = 'random_forest.py'
num_of_expected_results = 0
all_cmds_params = [] # a list of lists. Each sublist contain different parameters set for the same script to reduce the total number of jobs
for bc in biological_conditions:
aggregated_values_path = os.path.join(classification_output_path, bc, f'{bc}_values.csv')
pvalues_done_path = os.path.join(logs_dir, f'{bc}_values_done_fitting.txt')
aggregated_hits_path = os.path.join(classification_output_path, bc, f'{bc}_hits.csv')
hits_done_path = os.path.join(logs_dir, f'{bc}_hits_done_fitting.txt')

value_cmd = [aggregated_values_path, pvalues_done_path, f'--num_of_configurations_to_sample {num_of_random_configurations_to_sample}', f'--cv_num_of_splits {cv_num_of_splits}', f'--seed {seed_random_forest}', f'--rank_method {rank_method}']
hits_cmd = [aggregated_hits_path, hits_done_path, f'--num_of_configurations_to_sample {num_of_random_configurations_to_sample}', f'--cv_num_of_splits {cv_num_of_splits}', f'--seed {seed_random_forest}', '--rank_method hits']


if not os.path.exists(pvalues_done_path):
all_cmds_params.append(value_cmd)
else:
logger.debug(f'Skipping fitting as {pvalues_done_path} found')
num_of_expected_results += 1

if not os.path.exists(hits_done_path):
all_cmds_params.append(hits_cmd)
else:
logger.debug(f'Skipping fitting as {hits_done_path} found')
num_of_expected_results += 1

if len(all_cmds_params) > 0:
doubled_bc = repeat_items(biological_conditions)
for cmds_params, bc in zip(all_cmds_params, doubled_bc):
cmd = submit_pipeline_step(f'{src_dir}/model_fitting/{script_name}',
[cmds_params],
logs_dir, f'{bc}_model',
queue_name, verbose)
num_of_expected_results += 1 # a single job for each biological condition

wait_for_results(script_name, logs_dir, num_of_expected_results, example_cmd=cmd,
error_file_path=error_path, suffix='_done_fitting.txt')
if not stop_before_random_forest:
logger.info('_'*100)
logger.info(f'{datetime.datetime.now()}: fitting model')
script_name = 'random_forest.py'
num_of_expected_results = 0
all_cmds_params = [] # a list of lists. Each sublist contain different parameters set for the same script to reduce the total number of jobs
for bc in biological_conditions:
aggregated_values_path = os.path.join(classification_output_path, bc, f'{bc}_values.csv')
pvalues_done_path = os.path.join(logs_dir, f'{bc}_values_done_fitting.txt')
aggregated_hits_path = os.path.join(classification_output_path, bc, f'{bc}_hits.csv')
hits_done_path = os.path.join(logs_dir, f'{bc}_hits_done_fitting.txt')

value_cmd = [aggregated_values_path, pvalues_done_path, logs_dir, error_path, '--num_of_configurations_to_sample', num_of_random_configurations_to_sample, f'--cv_num_of_splits {cv_num_of_splits}'
'--number_parallel_random_forest', number_parallel_random_forest, '--min_value_error_random_forest', num_of_random_configurations_to_sample, '--seed', random_forest_seed,
f'--random_forest_seed {random_forest_seed_configurations}', '--rank_method {rank_method}']
hits_cmd = [aggregated_hits_path, hits_done_path, logs_dir, error_path, '--num_of_configurations_to_sample', num_of_configurations_to_sample, f'--cv_num_of_splits {cv_num_of_splits}'
'--number_parallel_random_forest', number_parallel_random_forest, '--min_value_error_random_forest', min_value_error_random_forest, '--_seed', random_forest_seed,
f'--random_forest_seed {random_forest_seed_configurations}', '--rank_method hits']
if rank_method == 'tfidf' or rank_method == 'shuffles':
value_cmd.append('--tfidf')
hits_cmd.append('--tfidf')
if not os.path.exists(pvalues_done_path):
all_cmds_params.append(value_cmd)
else:
logger.debug(f'Skipping fitting as {pvalues_done_path} found')
num_of_expected_results += 1

if not os.path.exists(hits_done_path):
all_cmds_params.append(hits_cmd)
else:
logger.debug(f'Skipping fitting as {hits_done_path} found')
num_of_expected_results += 1

if len(all_cmds_params) > 0:
doubled_bc = repeat_items(biological_conditions)
for cmds_params, bc in zip(all_cmds_params, doubled_bc):
cmd = submit_pipeline_step(f'{src_dir}/model_fitting/{script_name}',
[cmds_params],
logs_dir, f'{bc}_model',
queue_name, verbose)
num_of_expected_results += 1 # a single job for each biological condition

wait_for_results(script_name, logs_dir, num_of_expected_results, example_cmd=cmd,
error_file_path=error_path, suffix='_done_fitting.txt')
else:
logger.info(f'Skipping fitting, all found')

print('stop before')
logger.info(f'Stop before random forest')

# TODO: fix this bug with a GENERAL WRAPPER done_path
# wait_for_results(script_name, num_of_expected_results)
Expand Down Expand Up @@ -216,15 +222,19 @@ def get_faa_file_name_from_path(path, use_mapitope):
parser.add_argument('number_of_random_pssms', default=100, type=int, help='Number of pssm permutations')
parser.add_argument('done_file_path', help='A path to a file that signals that the module finished running successfully.')

parser.add_argument('--rank_method', choices=['pval', 'tfidf', 'shuffles'], default='shuffles', help='Motifs ranking method')
parser.add_argument('--stop_before_random_forest', action='store_true', help='A boolean flag for mark if we need to run the random forest')
parser.add_argument('--num_of_random_configurations_to_sample', default=100, type=int, help='How many random configurations of hyperparameters should be sampled?')
parser.add_argument('--number_parallel_random_forest', default=20, type=int, help='How many random forest configurations to run in parallel')
parser.add_argument('--min_value_error_random_forest', default=0, type=float, help='A random forest model error value for convergence allowing to stop early')
parser.add_argument('--rank_method', choices=['pval', 'tfidf', 'shuffles'], default='pval', help='Motifs ranking method')
parser.add_argument('--tfidf_method', choices=['boolean', 'terms', 'log', 'augmented'], default='boolean', help='TF-IDF method')
parser.add_argument('--tfidf_factor', type=float, default=0.5, help='TF-IDF augmented method factor (0-1)')
parser.add_argument('--shuffles', default=5, type=int, help='Number of controlled shuffles permutations')
parser.add_argument('--shuffles_percent', default=0.2, type=float, help='Percent from shuffle with greatest number of hits (0-1)')
parser.add_argument('--shuffles_digits', default=2, type=int, help='Number of digits after the point to print in scanning files.')
parser.add_argument('--num_of_random_configurations_to_sample', default=100, type=int, help='How many random configurations of hyperparameters should be sampled?')
parser.add_argument('--cv_num_of_splits', default=2, type=int, help='How folds should be in the cross validation process? (use 0 for leave one out)')
parser.add_argument('--seed_random_forest', default=42, type=int, help='Seed number for reconstructing experiments')
parser.add_argument('--random_forest_seed_configurations', default=123 , type=int, help='Random seed value for generating random forest configurations')
parser.add_argument('--error_path', type=str, help='a file in which errors will be written to')
parser.add_argument('-q', '--queue', default='pupkoweb', type=str, help='a queue to which the jobs will be submitted')
parser.add_argument('-v', '--verbose', action='store_true', help='Increase output verbosity')
Expand All @@ -241,8 +251,9 @@ def get_faa_file_name_from_path(path, use_mapitope):
error_path = args.error_path if args.error_path else os.path.join(args.parsed_fastq_results, 'error.txt')

build_classifier(args.parsed_fastq_results, args.motif_inference_results, args.classification_output_path,
args.logs_dir, args.samplename2biologicalcondition_path, args.done_file_path,
args.number_of_random_pssms, args.rank_method, args.tfidf_method, args.tfidf_factor,
args.shuffles, args.shuffles_percent, args.shuffles_digits, args.num_of_random_configurations_to_sample,
args.cv_num_of_splits, args.seed_random_forest, args.queue,
True if args.verbose else False, error_path, args.mapitope, sys.argv)
args.logs_dir, args.samplename2biologicalcondition_path, args.number_of_random_pssms, args.done_file_path,
args.stop_before_random_forest, args.num_of_random_configurations_to_sample,
args.number_parallel_random_forest, args.min_value_error_random_forest, args.rank_method,
args.tfidf_method, args.tfidf_factor, args.shuffles, args.shuffles_percent, args.shuffles_digits,
args.cv_num_of_splits, args.seed_random_forest, args.random_forest_seed_configurations,
args.queue, args.verbose, error_path, args.mapitope, sys.argv)
Loading

0 comments on commit f29afc4

Please sign in to comment.