Skip to content

Commit

Permalink
Merge pull request #42 from will-rowe/babygroot
Browse files Browse the repository at this point in the history
patch for data race
  • Loading branch information
Will Rowe authored May 11, 2020
2 parents 4526946 + 5cf7ec8 commit 1be76fa
Show file tree
Hide file tree
Showing 8 changed files with 126 additions and 66 deletions.
6 changes: 6 additions & 0 deletions cmd/align.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
var (
fastq *[]string // list of FASTQ files to align
fasta *bool // flag to treat input as fasta sequences
noAlign *bool // flag to prevent exact alignments
containmentThreshold *float64 // the containment threshold for the LSH ensemble
minKmerCoverage *float64 // the minimum k-mer coverage per base of a segment
graphDir *string // directory to save gfa graphs to
Expand All @@ -42,6 +43,7 @@ var alignCmd = &cobra.Command{
func init() {
fastq = alignCmd.Flags().StringSliceP("fastq", "f", []string{}, "FASTQ file(s) to align")
fasta = alignCmd.Flags().Bool("fasta", false, "if set, the input will be treated as fasta sequence(s) (experimental feature)")
noAlign = alignCmd.Flags().Bool("noAlign", false, "if set, no exact alignment will be performed - graphs will be weighted using approximate read mappings")
containmentThreshold = alignCmd.Flags().Float64P("contThresh", "t", 0.99, "containment threshold for the LSH ensemble")
minKmerCoverage = alignCmd.Flags().Float64P("minKmerCov", "c", 1.0, "minimum number of k-mers covering each base of a graph segment")
graphDir = alignCmd.PersistentFlags().StringP("graphDir", "g", defaultGraphDir, "directory to save variation graphs to")
Expand Down Expand Up @@ -115,8 +117,12 @@ func runSketch() {
info.Sketch = pipeline.AlignCmd{
Fasta: *fasta,
MinKmerCoverage: *minKmerCoverage,
NoExactAlign: *noAlign,
}
log.Printf("\tcontainment threshold: %.2f\n", info.ContainmentThreshold)
if *noAlign {
log.Printf("\tprevent exact alignments and using approximated mapping only\n")
}

// create the pipeline
log.Printf("initialising alignment pipeline...")
Expand Down
2 changes: 1 addition & 1 deletion docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
# The short X.Y version
version = '1.1'
# The full version, including alpha/beta/rc tags
release = '1.1.0'
release = '1.1.2'


# -- General configuration ---------------------------------------------------
Expand Down
4 changes: 4 additions & 0 deletions docs/using-groot.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ gunzip -c file.gz | ./groot align -i grootIndex -p 8 | ./groot report

Multiple FASTQ files can be specified as input, however all are treated as the same sample and paired-end info isn't used. To specify multiple files, make sure they are comma separated (`-f fileA.fq,fileB.fq`) or use gunzip/cat with a wildcard (gunzip -c \*.fq.gz | groot...).

Some more flags that can be used:

- `--noAlign`: if set, no exact alignment will be performed (graphs will still be weighted using approximate read mappings)

### report

The `report` subcommand is used to processes graph traversals and generate a resistome profile for a sample. Here is an example:
Expand Down
77 changes: 54 additions & 23 deletions src/pipeline/boss.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package pipeline

import (
"fmt"
"io"
"log"
"os"
"sync"
"time"
Expand All @@ -20,7 +20,7 @@ type theBoss struct {
refSAMheaders map[int][]*sam.Reference // map of SAM headers for each reference sequence, indexed by path ID
reads chan *seqio.FASTQread // the boss uses this channel to receive data from the main sketching pipeline
alignments chan *sam.Record // used to receive alignments from the graph minions
outFile io.Writer // destination for the BAM output
bamwriter *bam.Writer // destination for the BAM output
receivedReadCount int // the number of reads the boss is sent during it's lifetime
mappedCount int // the total number of reads that were successful mapped to at least one graph
multimappedCount int // the total number of reads that had mappings to multiple graphs
Expand All @@ -33,24 +33,23 @@ func newBoss(runtimeInfo *Info, inputChan chan *seqio.FASTQread) *theBoss {
return &theBoss{
info: runtimeInfo,
reads: inputChan,
outFile: os.Stdout,
alignments: make(chan *sam.Record, BUFFERSIZE),
receivedReadCount: 0,
mappedCount: 0,
multimappedCount: 0,
alignmentCount: 0,
}
}

// mapReads is a method to start off the minions to map and align reads, the minions to augment graphs, and collate the alignments
func (theBoss *theBoss) mapReads() error {
// setupBAM will set up the BAM STDOUT for reporting exact graph alignments
func (theBoss *theBoss) setupBAM() error {

// get the SAM headers
samHeaders, err := theBoss.info.Store.GetSAMrefs()
if err != nil {
return err
}
theBoss.refSAMheaders = samHeaders
theBoss.alignments = make(chan *sam.Record, BUFFERSIZE)

// get program info for SAM header (unique ID, name, command, previous program ID, version)
programInfo := sam.NewProgram("1", "groot", "groot align", "", version.GetVersion())
Expand Down Expand Up @@ -84,27 +83,52 @@ func (theBoss *theBoss) mapReads() error {
return err
}

// use a BAM file or STDOUT (TODO: not exposed to CLI yet)
var fh io.Writer
if theBoss.info.Sketch.BAMout != "" {
var err error
fh, err = os.Create(theBoss.info.Sketch.BAMout)
if err != nil {
return (fmt.Errorf("could not open file for BAM writing: %v", err))
}
} else {
fh = os.Stdout
}

// create the bam writer and write the header
bw, err := bam.NewWriter(theBoss.outFile, header, 0)
bw, err := bam.NewWriter(fh, header, 0)
if err != nil {
return err
}
theBoss.bamwriter = bw
return nil
}

// mapReads is a method to start off the minions to map and align reads, the minions to augment graphs, and collate the alignments
func (theBoss *theBoss) mapReads() error {
theBoss.alignments = make(chan *sam.Record, BUFFERSIZE)

// set up the BAM if exact alignment is requested
if !theBoss.info.Sketch.NoExactAlign {
if err := theBoss.setupBAM(); err != nil {
return err
}
}

// setup the waitgroups for the sketching and graphing minions
var wg1 sync.WaitGroup
var wg2 sync.WaitGroup

// launch the graph minions (one minion per graph in the index)
theBoss.graphMinionRegister = make([]*graphMinion, len(theBoss.info.Store))
for graphID, graph := range theBoss.info.Store {
for _, graph := range theBoss.info.Store {

// create, start and register the graph minion
minion := newGraphMinion(graphID, graph, theBoss.alignments, theBoss.refSAMheaders[int(graphID)], theBoss)
minion := newGraphMinion(theBoss, graph)
wg2.Add(1)
minion.start(&wg2)
theBoss.graphMinionRegister[graphID] = minion
theBoss.graphMinionRegister[graph.GraphID] = minion
}
log.Printf("\tgraph workers: %d", len(theBoss.graphMinionRegister))

// launch the sketching minions (one per CPU)
for i := 0; i < theBoss.info.NumProc; i++ {
Expand Down Expand Up @@ -149,18 +173,21 @@ func (theBoss *theBoss) mapReads() error {
if err != nil {
panic(err)
}
for graphID, hits := range results {

// make a copy of the read
readCopy := seqio.FASTQread{
Sequence: read.Sequence,
Misc: read.Misc,
Qual: read.Qual,
RC: read.RC,
}
// if multiple graphs are returned, we need to deep copy the read
deepCopy := false
if len(results) > 1 {
deepCopy = true
}

// send the windows to the correct go routine for read alignment and graph augmentation
theBoss.graphMinionRegister[graphID].inputChannel <- &graphMinionPair{hits, readCopy}
// augment graphs and optionally perform exact alignment
for graphID, hits := range results {
if deepCopy {
readCopy := *read.DeepCopy()
theBoss.graphMinionRegister[graphID].inputChannel <- &graphMinionPair{hits, readCopy}
} else {
theBoss.graphMinionRegister[graphID].inputChannel <- &graphMinionPair{hits, *read}
}
}

// update counts
Expand Down Expand Up @@ -201,11 +228,15 @@ func (theBoss *theBoss) mapReads() error {
// os.Exit(1)
//}
theBoss.alignmentCount++
if err := bw.Write(record); err != nil {
if err := theBoss.bamwriter.Write(record); err != nil {
return err
}
}

// close the bam writer and return to the completed boss to the pipeline
return bw.Close()
var err error
if !theBoss.info.Sketch.NoExactAlign {
err = theBoss.bamwriter.Close()
}
return err
}
32 changes: 17 additions & 15 deletions src/pipeline/graphminion.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,31 @@ type graphMinionPair struct {

// graphMinion holds a graph and is responsible for augmenting the paths when new mapping data arrives
type graphMinion struct {
id uint32
graph *graph.GrootGraph
inputChannel chan *graphMinionPair
outputChannel chan *sam.Record
runAlignment bool
references []*sam.Reference // the SAM references for each path in this graph
boss *theBoss // pointer to the boss so the minion can access the runtime info (e.g. k-mer size)
boss *theBoss // pointer to the boss so the minion can access the runtime info (e.g. k-mer size) and channels etc
id uint32 // id corresponds to the graphID
graph *graph.GrootGraph
inputChannel chan *graphMinionPair
runAlignment bool
references []*sam.Reference // the SAM references for each path in this graph
}

// newGraphMinion is the constructor function
func newGraphMinion(id uint32, graph *graph.GrootGraph, alignmentChan chan *sam.Record, refs []*sam.Reference, boss *theBoss) *graphMinion {
func newGraphMinion(boss *theBoss, graph *graph.GrootGraph) *graphMinion {
return &graphMinion{
id: id,
graph: graph,
inputChannel: make(chan *graphMinionPair, BUFFERSIZE),
outputChannel: alignmentChan,
references: refs,
boss: boss,
boss: boss,
id: graph.GraphID,
graph: graph,
inputChannel: make(chan *graphMinionPair, BUFFERSIZE),
}
}

// start is a method to start the graphMinion running
func (graphMinion *graphMinion) start(wg *sync.WaitGroup) {

// get the correct SAM formatted refs for this graph
graphMinion.references = graphMinion.boss.refSAMheaders[int(graphMinion.id)]

//
go func() {
for {

Expand Down Expand Up @@ -82,7 +84,7 @@ func (graphMinion *graphMinion) start(wg *sync.WaitGroup) {
// if an alignment was found, send them and call it a day
if len(alignments) != 0 {
for _, alignment := range alignments {
graphMinion.outputChannel <- alignment
graphMinion.boss.alignments <- alignment
}
alignmentFound = true
break
Expand Down
9 changes: 0 additions & 9 deletions src/pipeline/sketch.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,15 +311,6 @@ func (proc *ReadMapper) Run() {
// set up the boss/minion pool
theBoss := newBoss(proc.info, proc.input)

// update the BAM from STDOUT if needed (TODO: not exposed to CLI yet)
if proc.info.Sketch.BAMout != "" {
fh, err := os.Create(proc.info.Sketch.BAMout)
if err != nil {
misc.ErrorCheck(fmt.Errorf("could not open file for BAM writing: %v", err))
}
theBoss.outFile = fh
}

// run the mapping
misc.ErrorCheck(theBoss.mapReads())

Expand Down
60 changes: 43 additions & 17 deletions src/seqio/seqio.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,26 +83,52 @@ func (Sequence *Sequence) BaseCheck() error {
case 'N':
Sequence.Seq[i] = byte(base)
default:
//return fmt.Errorf("non \"A\\C\\T\\G\\N\\-\" base (%v)", string(FASTQread.Seq[i]))
//return fmt.Errorf("non \"A\\C\\T\\G\\N\\-\" base (%v)", string(r.Seq[i]))
Sequence.Seq[i] = byte('N')
}
}
return nil
}

// DeepCopy is a method to make a copy of a FASTQread
func (r *FASTQread) DeepCopy() *FASTQread {
newSeq := Sequence{
ID: make([]byte, len(r.ID)),
Seq: make([]byte, len(r.Seq)),
}
for i := 0; i < len(r.ID); i++ {
newSeq.ID[i] = r.ID[i]
}
for i := 0; i < len(r.Seq); i++ {
newSeq.Seq[i] = r.Seq[i]
}
newFASTQ := FASTQread{
Sequence: newSeq,
Misc: make([]byte, len(r.Misc)),
Qual: make([]byte, len(r.Qual)),
}
for i := 0; i < len(r.Misc); i++ {
newFASTQ.Misc[i] = r.Misc[i]
}
for i := 0; i < len(r.Qual); i++ {
newFASTQ.Qual[i] = r.Qual[i]
}
return &newFASTQ
}

// RevComplement is a method to reverse complement a sequence held by a FASTQread
// TODO: the quality scores are currently not reversed by this method
func (FASTQread *FASTQread) RevComplement() {
for i, j := 0, len(FASTQread.Seq); i < j; i++ {
FASTQread.Seq[i] = complementBases[FASTQread.Seq[i]]
func (r *FASTQread) RevComplement() {
for i, j := 0, len(r.Seq); i < j; i++ {
r.Seq[i] = complementBases[r.Seq[i]]
}
for i, j := 0, len(FASTQread.Seq)-1; i <= j; i, j = i+1, j-1 {
FASTQread.Seq[i], FASTQread.Seq[j] = FASTQread.Seq[j], FASTQread.Seq[i]
for i, j := 0, len(r.Seq)-1; i <= j; i, j = i+1, j-1 {
r.Seq[i], r.Seq[j] = r.Seq[j], r.Seq[i]
r.Qual[i], r.Qual[j] = r.Qual[j], r.Qual[i]
}
if FASTQread.RC == true {
FASTQread.RC = false
if r.RC == true {
r.RC = false
} else {
FASTQread.RC = true
r.RC = true
}
}

Expand All @@ -112,10 +138,10 @@ func (FASTQread *FASTQread) RevComplement() {
-2. sum these values across the read and trim at the index where the sum in minimal
-3. return the high-quality region
*/
func (FASTQread *FASTQread) QualTrim(minQual int) {
func (r *FASTQread) QualTrim(minQual int) {
start, qualSum, qualMax := 0, 0, 0
end := len(FASTQread.Qual)
for i, qual := range FASTQread.Qual {
end := len(r.Qual)
for i, qual := range r.Qual {
qualSum += minQual - (int(qual) - encoding)
if qualSum < 0 {
break
Expand All @@ -126,8 +152,8 @@ func (FASTQread *FASTQread) QualTrim(minQual int) {
}
}
qualSum, qualMax = 0, 0
for i, j := 0, len(FASTQread.Qual)-1; j >= i; j-- {
qualSum += minQual - (int(FASTQread.Qual[j]) - encoding)
for i, j := 0, len(r.Qual)-1; j >= i; j-- {
qualSum += minQual - (int(r.Qual[j]) - encoding)
if qualSum < 0 {
break
}
Expand All @@ -139,8 +165,8 @@ func (FASTQread *FASTQread) QualTrim(minQual int) {
if start >= end {
start, end = 0, 0
}
FASTQread.Seq = FASTQread.Seq[start:end]
FASTQread.Qual = FASTQread.Qual[start:end]
r.Seq = r.Seq[start:end]
r.Qual = r.Qual[start:end]
}

// NewFASTQread generates a new fastq read from 4 lines of data
Expand Down
2 changes: 1 addition & 1 deletion src/version/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ const major = 1
const minor = 1

// patch is the patch version number
const patch = 1
const patch = 2

// GetVersion returns the full version string for the current GROOT software
func GetVersion() string {
Expand Down

0 comments on commit 1be76fa

Please sign in to comment.