From 806069341169c506777efa9727ab26f616d1cfcb Mon Sep 17 00:00:00 2001 From: Heng Li Date: Sat, 12 Nov 2011 16:50:58 -0500 Subject: [PATCH] multithreading works again --- bwtsw2_aux.c | 46 +++++++++++++++++++++++++++++++--------------- bwtsw2_pair.c | 42 ++++++++++++++++++++++++++++++------------ main.c | 46 ++++++++++++++++++++++++++++------------------ utils.c | 16 ++++++++++++++++ utils.h | 3 +++ 5 files changed, 108 insertions(+), 45 deletions(-) diff --git a/bwtsw2_aux.c b/bwtsw2_aux.c index 6dc4f367..5d57ba85 100644 --- a/bwtsw2_aux.c +++ b/bwtsw2_aux.c @@ -71,9 +71,11 @@ bwtsw2_t *bsw2_dup_no_cigar(const bwtsw2_t *b) bwtsw2_t *p; p = calloc(1, sizeof(bwtsw2_t)); p->max = p->n = b->n; - kroundup32(p->max); - p->hits = calloc(p->max, sizeof(bsw2hit_t)); - memcpy(p->hits, b->hits, p->n * sizeof(bsw2hit_t)); + if (b->n) { + kroundup32(p->max); + p->hits = calloc(p->max, sizeof(bsw2hit_t)); + memcpy(p->hits, b->hits, p->n * sizeof(bsw2hit_t)); + } return p; } @@ -530,7 +532,7 @@ static void print_hits(const bntseq_t *bns, const bsw2opt_t *opt, bsw2seq1_t *ks /* Core routine to align reads in _seq. It is separated from * process_seqs() to realize multi-threading */ -static void bsw2_aln_core(int tid, bsw2seq_t *_seq, const bsw2opt_t *_opt, const bntseq_t *bns, uint8_t *pac, const bwt_t *target, int is_pe) +static void bsw2_aln_core(bsw2seq_t *_seq, const bsw2opt_t *_opt, const bntseq_t *bns, uint8_t *pac, const bwt_t *target, int is_pe) { int x; bsw2opt_t opt = *_opt; @@ -543,11 +545,6 @@ static void bsw2_aln_core(int tid, bsw2seq_t *_seq, const bsw2opt_t *_opt, const int i, l, k; bwtsw2_t *b[2]; l = p->l; - -#ifdef HAVE_PTHREAD - if (x % _opt->n_threads != tid) continue; -#endif - // set opt->t opt.t = _opt->t; if (opt.t < log(l) * opt.coef) opt.t = (int)(log(l) * opt.coef + .499); @@ -642,7 +639,7 @@ typedef struct { static void *worker(void *data) { thread_aux_t *p = (thread_aux_t*)data; - bsw2_aln_core(p->tid, p->_seq, p->_opt, p->bns, p->pac, p->target, p->is_pe); + bsw2_aln_core(p->_seq, p->_opt, p->bns, p->pac, p->target, p->is_pe); return 0; } #endif @@ -652,10 +649,11 @@ static void *worker(void *data) static void process_seqs(bsw2seq_t *_seq, const bsw2opt_t *opt, const bntseq_t *bns, uint8_t *pac, const bwt_t *target, int is_pe) { int i; + is_pe = is_pe? 1 : 0; #ifdef HAVE_PTHREAD if (opt->n_threads <= 1) { - bsw2_aln_core(0, _seq, opt, bns, pac, target, is_pe); + bsw2_aln_core(_seq, opt, bns, pac, target, is_pe); } else { pthread_t *tid; pthread_attr_t attr; @@ -667,15 +665,33 @@ static void process_seqs(bsw2seq_t *_seq, const bsw2opt_t *opt, const bntseq_t * tid = (pthread_t*)calloc(opt->n_threads, sizeof(pthread_t)); for (j = 0; j < opt->n_threads; ++j) { thread_aux_t *p = data + j; - p->tid = j; p->_seq = _seq; p->_opt = opt; p->bns = bns; p->is_pe = is_pe; + p->tid = j; p->_opt = opt; p->bns = bns; p->is_pe = is_pe; p->pac = pac; p->target = target; - pthread_create(&tid[j], &attr, worker, p); + p->_seq = calloc(1, sizeof(bsw2seq_t)); + p->_seq->max = (_seq->n + opt->n_threads - 1) / opt->n_threads + 1; + p->_seq->n = 0; + p->_seq->seq = calloc(p->_seq->max, sizeof(bsw2seq1_t)); } + for (i = 0; i < _seq->n; ++i) { // assign sequences to each thread + bsw2seq_t *p = data[(i>>is_pe)%opt->n_threads]._seq; + p->seq[p->n++] = _seq->seq[i]; + } + for (j = 0; j < opt->n_threads; ++j) pthread_create(&tid[j], &attr, worker, &data[j]); for (j = 0; j < opt->n_threads; ++j) pthread_join(tid[j], 0); + for (j = 0; j < opt->n_threads; ++j) data[j]._seq->n = 0; + for (i = 0; i < _seq->n; ++i) { // copy the result from each thread back + bsw2seq_t *p = data[(i>>is_pe)%opt->n_threads]._seq; + _seq->seq[i] = p->seq[p->n++]; + } + for (j = 0; j < opt->n_threads; ++j) { + thread_aux_t *p = data + j; + free(p->_seq->seq); + free(p->_seq); + } free(data); free(tid); } #else - bsw2_aln_core(0, _seq, opt, bns, pac, target, is_pe); + bsw2_aln_core(_seq, opt, bns, pac, target, is_pe); #endif // print and reset @@ -744,7 +760,7 @@ void bsw2_aln(const bsw2opt_t *opt, const bntseq_t *bns, bwt_t * const target, c is_pe = 0; } } - if (size > opt->chunk_size) { + if (size > opt->chunk_size * opt->n_threads) { fprintf(stderr, "[bsw2_aln] read %d sequences/pairs (%d bp)...\n", _seq->n, size); process_seqs(_seq, opt, bns, pac, target, is_pe); size = 0; diff --git a/bwtsw2_pair.c b/bwtsw2_pair.c index 5a831990..581e19c8 100644 --- a/bwtsw2_pair.c +++ b/bwtsw2_pair.c @@ -6,6 +6,7 @@ #include "bntseq.h" #include "bwtsw2.h" #include "ksw.h" +#include "kstring.h" #define MAX_INS 20000 #define MIN_RATIO 0.8 @@ -14,17 +15,18 @@ #define EXT_STDDEV 4.0 typedef struct { - int low, high; + int low, high, failed; double avg, std; } bsw2pestat_t; -bsw2pestat_t bsw2_stat(int n, bwtsw2_t **buf) +bsw2pestat_t bsw2_stat(int n, bwtsw2_t **buf, kstring_t *msg) { extern void ks_introsort_uint64_t(size_t n, uint64_t *a); int i, k, x, p25, p50, p75, tmp, max_len = 0; uint64_t *isize; bsw2pestat_t r; + memset(&r, 0, sizeof(bsw2pestat_t)); isize = calloc(n, 8); for (i = k = 0; i < n; i += 2) { bsw2hit_t *t[2]; @@ -42,11 +44,19 @@ bsw2pestat_t bsw2_stat(int n, bwtsw2_t **buf) p25 = isize[(int)(.25 * k + .499)]; p50 = isize[(int)(.50 * k + .499)]; p75 = isize[(int)(.75 * k + .499)]; + ksprintf(msg, "[%s] infer the insert size distribution from %d high-quality pairs.\n", __func__, k); + if (k < 8) { + ksprintf(msg, "[%s] fail to infer the insert size distribution.\n", __func__); + free(isize); + r.failed = 1; + return r; + } tmp = (int)(p25 - OUTLIER_BOUND * (p75 - p25) + .499); r.low = tmp > max_len? tmp : max_len; + if (r.low < 1) r.low = 1; r.high = (int)(p75 + OUTLIER_BOUND * (p75 - p25) + .499); - fprintf(stderr, "[%s] (25, 50, 75) percentile: (%d, %d, %d)\n", __func__, p25, p50, p75); - fprintf(stderr, "[%s] low and high boundaries for computing mean and std.dev: (%d, %d)\n", __func__, r.low, r.high); + ksprintf(msg, "[%s] (25, 50, 75) percentile: (%d, %d, %d)\n", __func__, p25, p50, p75); + ksprintf(msg, "[%s] low and high boundaries for computing mean and std.dev: (%d, %d)\n", __func__, r.low, r.high); for (i = x = 0, r.avg = 0; i < k; ++i) if (isize[i] >= r.low && isize[i] <= r.high) r.avg += isize[i], ++x; @@ -55,14 +65,15 @@ bsw2pestat_t bsw2_stat(int n, bwtsw2_t **buf) if (isize[i] >= r.low && isize[i] <= r.high) r.std += (isize[i] - r.avg) * (isize[i] - r.avg); r.std = sqrt(r.std / x); - fprintf(stderr, "[%s] mean and std.dev: (%.2f, %.2f)\n", __func__, r.avg, r.std); + ksprintf(msg, "[%s] mean and std.dev: (%.2f, %.2f)\n", __func__, r.avg, r.std); tmp = (int)(p25 - 3. * (p75 - p25) + .499); r.low = tmp > max_len? tmp : max_len; + if (r.low < 1) r.low = 1; r.high = (int)(p75 + 3. * (p75 - p25) + .499); if (r.low > r.avg - MAX_STDDEV * 4.) r.low = (int)(r.avg - MAX_STDDEV * 4. + .499); r.low = tmp > max_len? tmp : max_len; if (r.high < r.avg - MAX_STDDEV * 4.) r.high = (int)(r.avg + MAX_STDDEV * 4. + .499); - fprintf(stderr, "[%s] low and high boundaries for proper pairs: (%d, %d)\n", __func__, r.low, r.high); + ksprintf(msg, "[%s] low and high boundaries for proper pairs: (%d, %d)\n", __func__, r.low, r.high); free(isize); return r; } @@ -74,9 +85,8 @@ typedef struct { } pairaux_t; extern unsigned char nst_nt4_table[256]; -static int8_t g_mat[25]; -void bsw2_pair1(const bsw2opt_t *opt, int64_t l_pac, const uint8_t *pac, const bsw2pestat_t *st, const bsw2hit_t *h, int l_mseq, const char *mseq, bsw2hit_t *a) +void bsw2_pair1(const bsw2opt_t *opt, int64_t l_pac, const uint8_t *pac, const bsw2pestat_t *st, const bsw2hit_t *h, int l_mseq, const char *mseq, bsw2hit_t *a, int8_t g_mat[25]) { extern void seq_reverse(int len, ubyte_t *seq, int is_comp); int64_t k, beg, end; @@ -88,11 +98,13 @@ void bsw2_pair1(const bsw2opt_t *opt, int64_t l_pac, const uint8_t *pac, const b a->n_seeds = 1; a->flag |= BSW2_FLAG_MATESW; // before calling this routine, *a has been cleared with memset(0); the flag is set with 1<<6/7 if (h->is_rev == 0) { beg = (int64_t)(h->k + st->avg - EXT_STDDEV * st->std - l_mseq + .499); + if (beg < h->k) beg = h->k; end = (int64_t)(h->k + st->avg + EXT_STDDEV * st->std + .499); a->is_rev = 1; a->flag |= 16; } else { beg = (int64_t)(h->k + h->end - h->beg - st->avg - EXT_STDDEV * st->std + .499); end = (int64_t)(h->k + h->end - h->beg - st->avg + EXT_STDDEV * st->std + l_mseq + .499); + if (end > h->k + (h->end - h->beg)) end = h->k + (h->end - h->beg); a->is_rev = 0; } if (beg < 1) beg = 1; @@ -146,7 +158,10 @@ void bsw2_pair(const bsw2opt_t *opt, int64_t l_pac, const uint8_t *pac, int n, b extern int bsw2_resolve_duphits(const bntseq_t *bns, const bwt_t *bwt, bwtsw2_t *b, int IS); bsw2pestat_t pes; int i, j, k, n_rescued = 0, n_moved = 0, n_fixed = 0; - pes = bsw2_stat(n, hits); + int8_t g_mat[25]; + kstring_t msg; + memset(&msg, 0, sizeof(kstring_t)); + pes = bsw2_stat(n, hits, &msg); for (i = k = 0; i < 5; ++i) { for (j = 0; j < 4; ++j) g_mat[k++] = i == j? opt->a : -opt->b; @@ -163,11 +178,12 @@ void bsw2_pair(const bsw2opt_t *opt, int64_t l_pac, const uint8_t *pac, int n, b p->flag |= 1<<(6+j); } } + if (pes.failed) continue; if (hits[i] == 0 || hits[i+1] == 0) continue; // one end has excessive N if (hits[i]->n != 1 && hits[i+1]->n != 1) continue; // no end has exactly one hit if (hits[i]->n > 1 || hits[i+1]->n > 1) continue; // one read has more than one hit - if (hits[i+0]->n == 1) bsw2_pair1(opt, l_pac, pac, &pes, &hits[i+0]->hits[0], seq[i+1].l, seq[i+1].seq, &a[1]); - if (hits[i+1]->n == 1) bsw2_pair1(opt, l_pac, pac, &pes, &hits[i+1]->hits[0], seq[i+0].l, seq[i+0].seq, &a[0]); + if (hits[i+0]->n == 1) bsw2_pair1(opt, l_pac, pac, &pes, &hits[i+0]->hits[0], seq[i+1].l, seq[i+1].seq, &a[1], g_mat); + if (hits[i+1]->n == 1) bsw2_pair1(opt, l_pac, pac, &pes, &hits[i+1]->hits[0], seq[i+0].l, seq[i+0].seq, &a[0], g_mat); // the following enumerate all possibilities. It is tedious but necessary... if (hits[i]->n + hits[i+1]->n == 1) { // one end mapped; the other not; bwtsw2_t *p[2]; @@ -242,5 +258,7 @@ void bsw2_pair(const bsw2opt_t *opt, int64_t l_pac, const uint8_t *pac, int n, b } } } - fprintf(stderr, "[%s] #fixed=%d, #rescued=%d, #moved=%d\n", __func__, n_fixed, n_rescued, n_moved); + ksprintf(&msg, "[%s] #fixed=%d, #rescued=%d, #moved=%d\n", __func__, n_fixed, n_rescued, n_moved); + fputs(msg.s, stderr); + free(msg.s); } diff --git a/main.c b/main.c index edebe0bc..8121f362 100644 --- a/main.c +++ b/main.c @@ -39,28 +39,38 @@ void bwa_print_sam_PG() int main(int argc, char *argv[]) { + int i, ret; + double t_real; + t_real = realtime(); if (argc < 2) return usage(); - if (strcmp(argv[1], "fa2pac") == 0) return bwa_fa2pac(argc-1, argv+1); - else if (strcmp(argv[1], "pac2bwt") == 0) return bwa_pac2bwt(argc-1, argv+1); - else if (strcmp(argv[1], "pac2bwtgen") == 0) return bwt_bwtgen_main(argc-1, argv+1); - else if (strcmp(argv[1], "bwtupdate") == 0) return bwa_bwtupdate(argc-1, argv+1); - else if (strcmp(argv[1], "bwt2sa") == 0) return bwa_bwt2sa(argc-1, argv+1); - else if (strcmp(argv[1], "index") == 0) return bwa_index(argc-1, argv+1); - else if (strcmp(argv[1], "aln") == 0) return bwa_aln(argc-1, argv+1); - else if (strcmp(argv[1], "sw") == 0) return bwa_stdsw(argc-1, argv+1); - else if (strcmp(argv[1], "samse") == 0) return bwa_sai2sam_se(argc-1, argv+1); - else if (strcmp(argv[1], "sampe") == 0) return bwa_sai2sam_pe(argc-1, argv+1); - else if (strcmp(argv[1], "pac2cspac") == 0) return bwa_pac2cspac(argc-1, argv+1); - else if (strcmp(argv[1], "stdsw") == 0) return bwa_stdsw(argc-1, argv+1); - else if (strcmp(argv[1], "bwtsw2") == 0) return bwa_bwtsw2(argc-1, argv+1); - else if (strcmp(argv[1], "dbwtsw") == 0) return bwa_bwtsw2(argc-1, argv+1); - else if (strcmp(argv[1], "bwasw") == 0) return bwa_bwtsw2(argc-1, argv+1); - else if (strcmp(argv[1], "fastmap") == 0) return main_fastmap(argc-1, argv+1); + if (strcmp(argv[1], "fa2pac") == 0) ret = bwa_fa2pac(argc-1, argv+1); + else if (strcmp(argv[1], "pac2bwt") == 0) ret = bwa_pac2bwt(argc-1, argv+1); + else if (strcmp(argv[1], "pac2bwtgen") == 0) ret = bwt_bwtgen_main(argc-1, argv+1); + else if (strcmp(argv[1], "bwtupdate") == 0) ret = bwa_bwtupdate(argc-1, argv+1); + else if (strcmp(argv[1], "bwt2sa") == 0) ret = bwa_bwt2sa(argc-1, argv+1); + else if (strcmp(argv[1], "index") == 0) ret = bwa_index(argc-1, argv+1); + else if (strcmp(argv[1], "aln") == 0) ret = bwa_aln(argc-1, argv+1); + else if (strcmp(argv[1], "sw") == 0) ret = bwa_stdsw(argc-1, argv+1); + else if (strcmp(argv[1], "samse") == 0) ret = bwa_sai2sam_se(argc-1, argv+1); + else if (strcmp(argv[1], "sampe") == 0) ret = bwa_sai2sam_pe(argc-1, argv+1); + else if (strcmp(argv[1], "pac2cspac") == 0) ret = bwa_pac2cspac(argc-1, argv+1); + else if (strcmp(argv[1], "stdsw") == 0) ret = bwa_stdsw(argc-1, argv+1); + else if (strcmp(argv[1], "bwtsw2") == 0) ret = bwa_bwtsw2(argc-1, argv+1); + else if (strcmp(argv[1], "dbwtsw") == 0) ret = bwa_bwtsw2(argc-1, argv+1); + else if (strcmp(argv[1], "bwasw") == 0) ret = bwa_bwtsw2(argc-1, argv+1); + else if (strcmp(argv[1], "fastmap") == 0) ret = main_fastmap(argc-1, argv+1); else { fprintf(stderr, "[main] unrecognized command '%s'\n", argv[1]); return 1; } - err_fflush(stdout); - err_fclose(stdout); + err_fflush(stdout); + err_fclose(stdout); + if (ret == 0) { + fprintf(stderr, "[%s] Version: %s\n", __func__, PACKAGE_VERSION); + fprintf(stderr, "[%s] CMD:", __func__); + for (i = 0; i < argc; ++i) + fprintf(stderr, " %s", argv[i]); + fprintf(stderr, "\n[%s] Real time: %.3f sec; CPU: %.3f sec\n", __func__, realtime() - t_real, cputime()); + } return 0; } diff --git a/utils.c b/utils.c index d47ec5c2..8c1ad7ec 100644 --- a/utils.c +++ b/utils.c @@ -31,6 +31,8 @@ #include #include #include +#include +#include #include "utils.h" FILE *err_xopen_core(const char *func, const char *fn, const char *mode) @@ -146,3 +148,17 @@ int err_fclose(FILE *stream) return ret; } +double cputime() +{ + struct rusage r; + getrusage(RUSAGE_SELF, &r); + return r.ru_utime.tv_sec + r.ru_stime.tv_sec + 1e-6 * (r.ru_utime.tv_usec + r.ru_stime.tv_usec); +} + +double realtime() +{ + struct timeval tp; + struct timezone tzp; + gettimeofday(&tp, &tzp); + return tp.tv_sec + tp.tv_usec * 1e-6; +} diff --git a/utils.h b/utils.h index a7fecbc3..b6839e98 100644 --- a/utils.h +++ b/utils.h @@ -63,6 +63,9 @@ extern "C" { int err_fflush(FILE *stream); int err_fclose(FILE *stream); + double cputime(); + double realtime(); + #ifdef __cplusplus } #endif