From 34c603a17d6f73b1c4afbb65e333b3b6e46cf2e3 Mon Sep 17 00:00:00 2001 From: Will Rowe Date: Fri, 8 May 2020 13:37:50 +0100 Subject: [PATCH 1/4] fixing data race when read maps to multiple graphs --- src/pipeline/boss.go | 77 ++++++++++++++++++++++++++----------- src/pipeline/graphminion.go | 32 +++++++-------- src/pipeline/sketch.go | 9 ----- src/seqio/seqio.go | 60 +++++++++++++++++++++-------- 4 files changed, 114 insertions(+), 64 deletions(-) diff --git a/src/pipeline/boss.go b/src/pipeline/boss.go index b225bbe..137fbd9 100644 --- a/src/pipeline/boss.go +++ b/src/pipeline/boss.go @@ -1,8 +1,8 @@ package pipeline import ( + "fmt" "io" - "log" "os" "sync" "time" @@ -20,7 +20,7 @@ type theBoss struct { refSAMheaders map[int][]*sam.Reference // map of SAM headers for each reference sequence, indexed by path ID reads chan *seqio.FASTQread // the boss uses this channel to receive data from the main sketching pipeline alignments chan *sam.Record // used to receive alignments from the graph minions - outFile io.Writer // destination for the BAM output + bamwriter *bam.Writer // destination for the BAM output receivedReadCount int // the number of reads the boss is sent during it's lifetime mappedCount int // the total number of reads that were successful mapped to at least one graph multimappedCount int // the total number of reads that had mappings to multiple graphs @@ -33,7 +33,7 @@ func newBoss(runtimeInfo *Info, inputChan chan *seqio.FASTQread) *theBoss { return &theBoss{ info: runtimeInfo, reads: inputChan, - outFile: os.Stdout, + alignments: make(chan *sam.Record, BUFFERSIZE), receivedReadCount: 0, mappedCount: 0, multimappedCount: 0, @@ -41,8 +41,8 @@ func newBoss(runtimeInfo *Info, inputChan chan *seqio.FASTQread) *theBoss { } } -// mapReads is a method to start off the minions to map and align reads, the minions to augment graphs, and collate the alignments -func (theBoss *theBoss) mapReads() error { +// setupBAM will set up the BAM STDOUT for reporting exact graph alignments +func (theBoss *theBoss) setupBAM() error { // get the SAM headers samHeaders, err := theBoss.info.Store.GetSAMrefs() @@ -50,7 +50,6 @@ func (theBoss *theBoss) mapReads() error { return err } theBoss.refSAMheaders = samHeaders - theBoss.alignments = make(chan *sam.Record, BUFFERSIZE) // get program info for SAM header (unique ID, name, command, previous program ID, version) programInfo := sam.NewProgram("1", "groot", "groot align", "", version.GetVersion()) @@ -84,11 +83,37 @@ func (theBoss *theBoss) mapReads() error { return err } + // use a BAM file or STDOUT (TODO: not exposed to CLI yet) + var fh io.Writer + if theBoss.info.Sketch.BAMout != "" { + var err error + fh, err = os.Create(theBoss.info.Sketch.BAMout) + if err != nil { + return (fmt.Errorf("could not open file for BAM writing: %v", err)) + } + } else { + fh = os.Stdout + } + // create the bam writer and write the header - bw, err := bam.NewWriter(theBoss.outFile, header, 0) + bw, err := bam.NewWriter(fh, header, 0) if err != nil { return err } + theBoss.bamwriter = bw + return nil +} + +// mapReads is a method to start off the minions to map and align reads, the minions to augment graphs, and collate the alignments +func (theBoss *theBoss) mapReads() error { + theBoss.alignments = make(chan *sam.Record, BUFFERSIZE) + + // set up the BAM if exact alignment is requested + if !theBoss.info.Sketch.NoExactAlign { + if err := theBoss.setupBAM(); err != nil { + return err + } + } // setup the waitgroups for the sketching and graphing minions var wg1 sync.WaitGroup @@ -96,15 +121,14 @@ func (theBoss *theBoss) mapReads() error { // launch the graph minions (one minion per graph in the index) theBoss.graphMinionRegister = make([]*graphMinion, len(theBoss.info.Store)) - for graphID, graph := range theBoss.info.Store { + for _, graph := range theBoss.info.Store { // create, start and register the graph minion - minion := newGraphMinion(graphID, graph, theBoss.alignments, theBoss.refSAMheaders[int(graphID)], theBoss) + minion := newGraphMinion(theBoss, graph) wg2.Add(1) minion.start(&wg2) - theBoss.graphMinionRegister[graphID] = minion + theBoss.graphMinionRegister[graph.GraphID] = minion } - log.Printf("\tgraph workers: %d", len(theBoss.graphMinionRegister)) // launch the sketching minions (one per CPU) for i := 0; i < theBoss.info.NumProc; i++ { @@ -149,18 +173,21 @@ func (theBoss *theBoss) mapReads() error { if err != nil { panic(err) } - for graphID, hits := range results { - // make a copy of the read - readCopy := seqio.FASTQread{ - Sequence: read.Sequence, - Misc: read.Misc, - Qual: read.Qual, - RC: read.RC, - } + // if multiple graphs are returned, we need to deep copy the read + deepCopy := false + if len(results) > 1 { + deepCopy = true + } - // send the windows to the correct go routine for read alignment and graph augmentation - theBoss.graphMinionRegister[graphID].inputChannel <- &graphMinionPair{hits, readCopy} + // augment graphs and optionally perform exact alignment + for graphID, hits := range results { + if deepCopy { + readCopy := *read.DeepCopy() + theBoss.graphMinionRegister[graphID].inputChannel <- &graphMinionPair{hits, readCopy} + } else { + theBoss.graphMinionRegister[graphID].inputChannel <- &graphMinionPair{hits, *read} + } } // update counts @@ -201,11 +228,15 @@ func (theBoss *theBoss) mapReads() error { // os.Exit(1) //} theBoss.alignmentCount++ - if err := bw.Write(record); err != nil { + if err := theBoss.bamwriter.Write(record); err != nil { return err } } // close the bam writer and return to the completed boss to the pipeline - return bw.Close() + var err error + if !theBoss.info.Sketch.NoExactAlign { + err = theBoss.bamwriter.Close() + } + return err } diff --git a/src/pipeline/graphminion.go b/src/pipeline/graphminion.go index 21e3a6e..5ef6b2a 100644 --- a/src/pipeline/graphminion.go +++ b/src/pipeline/graphminion.go @@ -18,29 +18,31 @@ type graphMinionPair struct { // graphMinion holds a graph and is responsible for augmenting the paths when new mapping data arrives type graphMinion struct { - id uint32 - graph *graph.GrootGraph - inputChannel chan *graphMinionPair - outputChannel chan *sam.Record - runAlignment bool - references []*sam.Reference // the SAM references for each path in this graph - boss *theBoss // pointer to the boss so the minion can access the runtime info (e.g. k-mer size) + boss *theBoss // pointer to the boss so the minion can access the runtime info (e.g. k-mer size) and channels etc + id uint32 // id corresponds to the graphID + graph *graph.GrootGraph + inputChannel chan *graphMinionPair + runAlignment bool + references []*sam.Reference // the SAM references for each path in this graph } // newGraphMinion is the constructor function -func newGraphMinion(id uint32, graph *graph.GrootGraph, alignmentChan chan *sam.Record, refs []*sam.Reference, boss *theBoss) *graphMinion { +func newGraphMinion(boss *theBoss, graph *graph.GrootGraph) *graphMinion { return &graphMinion{ - id: id, - graph: graph, - inputChannel: make(chan *graphMinionPair, BUFFERSIZE), - outputChannel: alignmentChan, - references: refs, - boss: boss, + boss: boss, + id: graph.GraphID, + graph: graph, + inputChannel: make(chan *graphMinionPair, BUFFERSIZE), } } // start is a method to start the graphMinion running func (graphMinion *graphMinion) start(wg *sync.WaitGroup) { + + // get the correct SAM formatted refs for this graph + graphMinion.references = graphMinion.boss.refSAMheaders[int(graphMinion.id)] + + // go func() { for { @@ -82,7 +84,7 @@ func (graphMinion *graphMinion) start(wg *sync.WaitGroup) { // if an alignment was found, send them and call it a day if len(alignments) != 0 { for _, alignment := range alignments { - graphMinion.outputChannel <- alignment + graphMinion.boss.alignments <- alignment } alignmentFound = true break diff --git a/src/pipeline/sketch.go b/src/pipeline/sketch.go index 716a793..ef4f6a2 100644 --- a/src/pipeline/sketch.go +++ b/src/pipeline/sketch.go @@ -311,15 +311,6 @@ func (proc *ReadMapper) Run() { // set up the boss/minion pool theBoss := newBoss(proc.info, proc.input) - // update the BAM from STDOUT if needed (TODO: not exposed to CLI yet) - if proc.info.Sketch.BAMout != "" { - fh, err := os.Create(proc.info.Sketch.BAMout) - if err != nil { - misc.ErrorCheck(fmt.Errorf("could not open file for BAM writing: %v", err)) - } - theBoss.outFile = fh - } - // run the mapping misc.ErrorCheck(theBoss.mapReads()) diff --git a/src/seqio/seqio.go b/src/seqio/seqio.go index 5521b91..2203311 100644 --- a/src/seqio/seqio.go +++ b/src/seqio/seqio.go @@ -83,26 +83,52 @@ func (Sequence *Sequence) BaseCheck() error { case 'N': Sequence.Seq[i] = byte(base) default: - //return fmt.Errorf("non \"A\\C\\T\\G\\N\\-\" base (%v)", string(FASTQread.Seq[i])) + //return fmt.Errorf("non \"A\\C\\T\\G\\N\\-\" base (%v)", string(r.Seq[i])) Sequence.Seq[i] = byte('N') } } return nil } +// DeepCopy is a method to make a copy of a FASTQread +func (r *FASTQread) DeepCopy() *FASTQread { + newSeq := Sequence{ + ID: make([]byte, len(r.ID)), + Seq: make([]byte, len(r.Seq)), + } + for i := 0; i < len(r.ID); i++ { + newSeq.ID[i] = r.ID[i] + } + for i := 0; i < len(r.Seq); i++ { + newSeq.Seq[i] = r.Seq[i] + } + newFASTQ := FASTQread{ + Sequence: newSeq, + Misc: make([]byte, len(r.Misc)), + Qual: make([]byte, len(r.Qual)), + } + for i := 0; i < len(r.Misc); i++ { + newFASTQ.Misc[i] = r.Misc[i] + } + for i := 0; i < len(r.Qual); i++ { + newFASTQ.Qual[i] = r.Qual[i] + } + return &newFASTQ +} + // RevComplement is a method to reverse complement a sequence held by a FASTQread -// TODO: the quality scores are currently not reversed by this method -func (FASTQread *FASTQread) RevComplement() { - for i, j := 0, len(FASTQread.Seq); i < j; i++ { - FASTQread.Seq[i] = complementBases[FASTQread.Seq[i]] +func (r *FASTQread) RevComplement() { + for i, j := 0, len(r.Seq); i < j; i++ { + r.Seq[i] = complementBases[r.Seq[i]] } - for i, j := 0, len(FASTQread.Seq)-1; i <= j; i, j = i+1, j-1 { - FASTQread.Seq[i], FASTQread.Seq[j] = FASTQread.Seq[j], FASTQread.Seq[i] + for i, j := 0, len(r.Seq)-1; i <= j; i, j = i+1, j-1 { + r.Seq[i], r.Seq[j] = r.Seq[j], r.Seq[i] + r.Qual[i], r.Qual[j] = r.Qual[j], r.Qual[i] } - if FASTQread.RC == true { - FASTQread.RC = false + if r.RC == true { + r.RC = false } else { - FASTQread.RC = true + r.RC = true } } @@ -112,10 +138,10 @@ func (FASTQread *FASTQread) RevComplement() { -2. sum these values across the read and trim at the index where the sum in minimal -3. return the high-quality region */ -func (FASTQread *FASTQread) QualTrim(minQual int) { +func (r *FASTQread) QualTrim(minQual int) { start, qualSum, qualMax := 0, 0, 0 - end := len(FASTQread.Qual) - for i, qual := range FASTQread.Qual { + end := len(r.Qual) + for i, qual := range r.Qual { qualSum += minQual - (int(qual) - encoding) if qualSum < 0 { break @@ -126,8 +152,8 @@ func (FASTQread *FASTQread) QualTrim(minQual int) { } } qualSum, qualMax = 0, 0 - for i, j := 0, len(FASTQread.Qual)-1; j >= i; j-- { - qualSum += minQual - (int(FASTQread.Qual[j]) - encoding) + for i, j := 0, len(r.Qual)-1; j >= i; j-- { + qualSum += minQual - (int(r.Qual[j]) - encoding) if qualSum < 0 { break } @@ -139,8 +165,8 @@ func (FASTQread *FASTQread) QualTrim(minQual int) { if start >= end { start, end = 0, 0 } - FASTQread.Seq = FASTQread.Seq[start:end] - FASTQread.Qual = FASTQread.Qual[start:end] + r.Seq = r.Seq[start:end] + r.Qual = r.Qual[start:end] } // NewFASTQread generates a new fastq read from 4 lines of data From f61e08220ae81ec7880e41588803394a2644eda4 Mon Sep 17 00:00:00 2001 From: Will Rowe Date: Fri, 8 May 2020 13:38:33 +0100 Subject: [PATCH 2/4] adding cli flag to skip exact alignments during the align command --- cmd/align.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/cmd/align.go b/cmd/align.go index 5132a30..d90a497 100644 --- a/cmd/align.go +++ b/cmd/align.go @@ -19,6 +19,7 @@ import ( var ( fastq *[]string // list of FASTQ files to align fasta *bool // flag to treat input as fasta sequences + noAlign *bool // flag to prevent exact alignments containmentThreshold *float64 // the containment threshold for the LSH ensemble minKmerCoverage *float64 // the minimum k-mer coverage per base of a segment graphDir *string // directory to save gfa graphs to @@ -42,6 +43,7 @@ var alignCmd = &cobra.Command{ func init() { fastq = alignCmd.Flags().StringSliceP("fastq", "f", []string{}, "FASTQ file(s) to align") fasta = alignCmd.Flags().Bool("fasta", false, "if set, the input will be treated as fasta sequence(s) (experimental feature)") + noAlign = alignCmd.Flags().Bool("noAlign", false, "it set, no exact alignment will be performed - graphs will be weighted using approximated read mappings") containmentThreshold = alignCmd.Flags().Float64P("contThresh", "t", 0.99, "containment threshold for the LSH ensemble") minKmerCoverage = alignCmd.Flags().Float64P("minKmerCov", "c", 1.0, "minimum number of k-mers covering each base of a graph segment") graphDir = alignCmd.PersistentFlags().StringP("graphDir", "g", defaultGraphDir, "directory to save variation graphs to") @@ -115,8 +117,12 @@ func runSketch() { info.Sketch = pipeline.AlignCmd{ Fasta: *fasta, MinKmerCoverage: *minKmerCoverage, + NoExactAlign: *noAlign, } log.Printf("\tcontainment threshold: %.2f\n", info.ContainmentThreshold) + if *noAlign { + log.Printf("\tprevent exact alignments and using approximated mapping only\n") + } // create the pipeline log.Printf("initialising alignment pipeline...") From e2b20b34ff0081992df3b7165c13db198f7eea47 Mon Sep 17 00:00:00 2001 From: Will Rowe Date: Fri, 8 May 2020 13:55:39 +0100 Subject: [PATCH 3/4] fixing help typos --- cmd/align.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/align.go b/cmd/align.go index d90a497..17c1448 100644 --- a/cmd/align.go +++ b/cmd/align.go @@ -43,7 +43,7 @@ var alignCmd = &cobra.Command{ func init() { fastq = alignCmd.Flags().StringSliceP("fastq", "f", []string{}, "FASTQ file(s) to align") fasta = alignCmd.Flags().Bool("fasta", false, "if set, the input will be treated as fasta sequence(s) (experimental feature)") - noAlign = alignCmd.Flags().Bool("noAlign", false, "it set, no exact alignment will be performed - graphs will be weighted using approximated read mappings") + noAlign = alignCmd.Flags().Bool("noAlign", false, "if set, no exact alignment will be performed - graphs will be weighted using approximate read mappings") containmentThreshold = alignCmd.Flags().Float64P("contThresh", "t", 0.99, "containment threshold for the LSH ensemble") minKmerCoverage = alignCmd.Flags().Float64P("minKmerCov", "c", 1.0, "minimum number of k-mers covering each base of a graph segment") graphDir = alignCmd.PersistentFlags().StringP("graphDir", "g", defaultGraphDir, "directory to save variation graphs to") From 5cf7ec86de2aecd7cfc3f77113a41aa7123ec03e Mon Sep 17 00:00:00 2001 From: Will Rowe Date: Fri, 8 May 2020 13:56:06 +0100 Subject: [PATCH 4/4] updating docs for patch release --- docs/conf.py | 2 +- docs/using-groot.md | 4 ++++ src/version/version.go | 2 +- 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/docs/conf.py b/docs/conf.py index d13924f..510230a 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -32,7 +32,7 @@ # The short X.Y version version = '1.1' # The full version, including alpha/beta/rc tags -release = '1.1.0' +release = '1.1.2' # -- General configuration --------------------------------------------------- diff --git a/docs/using-groot.md b/docs/using-groot.md index 245816e..c733399 100644 --- a/docs/using-groot.md +++ b/docs/using-groot.md @@ -120,6 +120,10 @@ gunzip -c file.gz | ./groot align -i grootIndex -p 8 | ./groot report Multiple FASTQ files can be specified as input, however all are treated as the same sample and paired-end info isn't used. To specify multiple files, make sure they are comma separated (`-f fileA.fq,fileB.fq`) or use gunzip/cat with a wildcard (gunzip -c \*.fq.gz | groot...). +Some more flags that can be used: + +- `--noAlign`: if set, no exact alignment will be performed (graphs will still be weighted using approximate read mappings) + ### report The `report` subcommand is used to processes graph traversals and generate a resistome profile for a sample. Here is an example: diff --git a/src/version/version.go b/src/version/version.go index 5ef3dda..30c294e 100644 --- a/src/version/version.go +++ b/src/version/version.go @@ -9,7 +9,7 @@ const major = 1 const minor = 1 // patch is the patch version number -const patch = 1 +const patch = 2 // GetVersion returns the full version string for the current GROOT software func GetVersion() string {