diff --git a/VERSION b/VERSION index 3f44223..e87b0da 100644 --- a/VERSION +++ b/VERSION @@ -1,2 +1,2 @@ -1.3.3 +1.3.4 diff --git a/bin/to_piezo_catalogue.py b/bin/to_piezo_catalogue.py index 2f91999..85a7daa 100755 --- a/bin/to_piezo_catalogue.py +++ b/bin/to_piezo_catalogue.py @@ -21,14 +21,18 @@ def mutations_to_catalogue(mutations, filename, ref_genome, catalogue): ref_genome (str): Name of the genome catalogue (str): Name of the catalogue """ - header = "GENBANK_REFERENCE,CATALOGUE_NAME,CATALOGUE_VERSION,CATALOGUE_GRAMMAR,PREDICTION_VALUES,DRUG,MUTATION,PREDICTION,SOURCE,EVIDENCE,OTHER\n" + header = ( + "GENBANK_REFERENCE,CATALOGUE_NAME,CATALOGUE_VERSION,CATALOGUE_GRAMMAR," + "PREDICTION_VALUES,DRUG,MUTATION,PREDICTION,SOURCE,EVIDENCE,OTHER\n" + ) common_all = f"{ref_genome},{catalogue},1.0,GARC1,RUS,NAN," # Get all genes within the mutations genes = sorted(list(set([mutation.split("@")[0] for mutation in mutations]))) with open(filename, "w") as f: f.write(header) - # Piezo requires all prediction values are in the catalogue, and all valid `PREDICTION_VALUES` include `R` + # Piezo requires all prediction values are in the catalogue, + # and all valid `PREDICTION_VALUES` include `R` # So add a dummy record with R as S and U are used elsewhere f.write(common_all + "NOTAGENE@A1!,R,{},{},{}\n") # Add general rules for genes @@ -42,9 +46,10 @@ def mutations_to_catalogue(mutations, filename, ref_genome, catalogue): if __name__ == "__main__": - assert ( - len(sys.argv) == 4 - ), "Incorrect usage. Try: python to_piezo_catalogue.py " + assert len(sys.argv) == 4, ( + "Incorrect usage. Try: python to_piezo_catalogue.py " + " " + ) genome_path = sys.argv[1] vcf_path = sys.argv[2] output_filename = sys.argv[3] diff --git a/gumpy/genome.py b/gumpy/genome.py index c5a1313..c1299b0 100644 --- a/gumpy/genome.py +++ b/gumpy/genome.py @@ -999,34 +999,31 @@ def __add__(self, vcf: VCFFile): # use the calls dict to change the nucleotide indicies in the copy of the genome for idx, type_ in tqdm(vcf.calls.keys(), disable=(not self.show_progress_bar)): - genome.vcf_evidence[genome.nucleotide_index[idx - 1]] = vcf.calls[ - (idx, type_) - ]["original_vcf_row"] - - # deal with changes at a single nucleotide site - if type_ in ["snp", "null", "het"]: - # only set values if the idx is to a single nucleotide - genome.nucleotide_sequence[idx - 1] = vcf.calls[(idx, type_)]["call"] - - # deal with insertions and deletions - elif type_ == "indel": - genome.is_indel[idx - 1] = True - genome.indel_nucleotides[idx - 1] = vcf.calls[(idx, type_)]["call"][1] - - if vcf.calls[(idx, type_)]["call"][0] == "ins": - genome.indel_length[idx - 1] = len( - vcf.calls[(idx, type_)]["call"][1] - ) - else: - genome.indel_length[idx - 1] = -1 * len( - vcf.calls[(idx, type_)]["call"][1] - ) - - elif type_ == "ref": - # These only exist due to reference calls - # They only made it this far as they are required to pull out minors at - # these positions - pass + for item in vcf.calls[(idx, type_)]: + genome.vcf_evidence[genome.nucleotide_index[idx - 1]] = item[ + "original_vcf_row" + ] + + # deal with changes at a single nucleotide site + if type_ in ["snp", "null", "het"]: + # only set values if the idx is to a single nucleotide + genome.nucleotide_sequence[idx - 1] = item["call"] + + # deal with insertions and deletions + elif type_ == "indel": + genome.is_indel[idx - 1] = True + genome.indel_nucleotides[idx - 1] = item["call"][1] + + if item["call"][0] == "ins": + genome.indel_length[idx - 1] = len(item["call"][1]) + else: + genome.indel_length[idx - 1] = -1 * len(item["call"][1]) + + elif type_ == "ref": + # These only exist due to reference calls + # They only made it this far as they are required to pull + # out minors at these positions + pass genome.minor_populations = [] diff --git a/gumpy/variantfile.py b/gumpy/variantfile.py index 2af8f3f..da8318c 100644 --- a/gumpy/variantfile.py +++ b/gumpy/variantfile.py @@ -402,111 +402,112 @@ def _find_minor_populations(self): self.minor_populations = [] simple_calls = [] for idx, type_ in self.calls.keys(): - # Get the simple format of this call for comparison - if isinstance(self.calls[(idx, type_)]["call"], tuple): - # Indels - t = self.calls[(idx, type_)]["call"][0] - bases = self.calls[(idx, type_)]["call"][1] - pos = self.calls[(idx, type_)]["pos"] - else: - # Snps - if self.calls[(idx, type_)]["call"] == "x": - # Null calls shouldn't have minor populations - continue - t = "snp" - bases = ( - self.calls[(idx, type_)]["ref"], - self.calls[(idx, type_)]["call"], - ) - pos = self.calls[(idx, type_)]["pos"] - simple_calls.append((idx, pos, t, bases)) + for item in self.calls[(idx, type_)]: + # Get the simple format of this call for comparison + if isinstance(item["call"], tuple): + # Indels + t = item["call"][0] + bases = item["call"][1] + pos = item["pos"] + else: + # Snps + if item["call"] == "x": + # Null calls shouldn't have minor populations + continue + t = "snp" + bases = ( + item["ref"], + item["call"], + ) + pos = item["pos"] + simple_calls.append((idx, pos, t, bases)) seen = set() for idx, type_ in tqdm(self.calls.keys()): - # Check if we've delt with this vcf already - if str(self.calls[(idx, type_)]["original_vcf_row"]) in seen: - continue - else: - seen.add(str(self.calls[(idx, type_)]["original_vcf_row"])) - - # Pull out depth tag from the specific row's format fields - # as the file metadata isn't a guarantee of the actual fields of this row - allelic_depth_tag = ( - "COV" - if self.calls[(idx, type_)]["original_vcf_row"].get("COV", None) - else "AD" - ) + for item in self.calls[(idx, type_)]: + # Check if we've delt with this vcf already + if str(item["original_vcf_row"]) in seen: + continue + else: + seen.add(str(item["original_vcf_row"])) - # Checking for het calls - if self.calls[(idx, type_)]["call"] == "z": - if 0 not in self.calls[(idx, type_)]["original_vcf_row"]["GT"]: - # Het call without a wildtype call, so warn about weird behaviour - warnings.warn( - f"Minor population detected at position {idx}, which doesn't " - f"include a wildtype call. Call: " - f"{self.calls[(idx, type_)]['original_vcf_row']['GT']}. " - "Note that there may be multiple mutations given at this index", - UserWarning, - ) + # Pull out depth tag from the specific row's format fields + # as the file metadata isn't a guarantee of the actual + # fields of this row + allelic_depth_tag = ( + "COV" if item["original_vcf_row"].get("COV", None) else "AD" + ) - # Reference base(s) - ref = self.calls[(idx, type_)]["original_vcf_row"]["REF"] + # Checking for het calls + if item["call"] == "z": + if 0 not in item["original_vcf_row"]["GT"]: + # Het call without a wildtype call, so warn about + # behaviour + warnings.warn( + f"Minor population detected at position {idx}, which " + f"doesn't include a wildtype call. Call: " + f"{item['original_vcf_row']['GT']}. Note that there " + "may be multiple mutations given at this index", + UserWarning, + ) - if self.calls[(idx, type_)]["original_vcf_row"]["ALTS"] is None: - # Case arrises from gvcf ref calls not giving any alts - calls = [self.calls[(idx, type_)]["original_vcf_row"]["REF"]] - else: - # Get all of the calls - calls = [self.calls[(idx, type_)]["original_vcf_row"]["REF"]] + list( - self.calls[(idx, type_)]["original_vcf_row"]["ALTS"] - ) + # Reference base(s) + ref = item["original_vcf_row"]["REF"] - # Break down the calls as appropriate - simple = [self._simplify_call(ref, alt) for alt in calls] + if item["original_vcf_row"]["ALTS"] is None: + # Case arrises from gvcf ref calls not giving any alts + calls = [item["original_vcf_row"]["REF"]] + else: + # Get all of the calls + calls = [item["original_vcf_row"]["REF"]] + list( + item["original_vcf_row"]["ALTS"] + ) - # Map each call to the corresponding read depth - dps = list(self.calls[(idx, type_)]["original_vcf_row"][allelic_depth_tag]) + # Break down the calls as appropriate + simple = [self._simplify_call(ref, alt) for alt in calls] - # GVCF null calls get None for depth, so catch (and skip) this - if dps == [None]: - continue - else: - total_depth = sum(dps) - - # idx here refers to the position of this call, NOT this vcf row, so adjust - # to avoid shifting when building minor calls - original_idx = idx - idx = idx - self.calls[(idx, type_)]["pos"] - for calls, depth in zip(simple, dps): - # As we can have >1 call per simple, iter - for call in calls: - # Check that this call isn't one of the actual calls - if (idx + int(call[0]), *call) in simple_calls: - # Is an actual call so we skip - continue - # Check if there are >=2 reads to support this call - if depth >= 2: - # These are minor calls!! - pos = idx + int(call[0]) - if pos not in self.minor_population_indices: - # We don't actually care though - # This has to be done here as simplifying calls can move - # the position - continue - if call[1] == "snp" and call[2][0] == call[2][1]: - # Ref calls aren't interesting + # Map each call to the corresponding read depth + dps = list(item["original_vcf_row"][allelic_depth_tag]) + + # GVCF null calls get None for depth, so catch (and skip) this + if dps == [None]: + continue + else: + total_depth = sum(dps) + + # idx here refers to the position of this call, NOT this vcf row, + # so adjust to avoid shifting when building minor calls + idx = idx - item["pos"] + for calls, depth in zip(simple, dps): + # As we can have >1 call per simple, iter + for call in calls: + # Check that this call isn't one of the actual calls + if (idx + int(call[0]), *call) in simple_calls: + # Is an actual call so we skip continue - # Only tracking absolute number of reads - self.minor_populations.append( - ( - pos, - call[1], - call[2], - int(depth), - round(depth / total_depth, 3), - self.calls[(original_idx, type_)]["original_vcf_row"], + # Check if there are >=2 reads to support this call + if depth >= 2: + # These are minor calls!! + pos = idx + int(call[0]) + if pos not in self.minor_population_indices: + # We don't actually care though + # This has to be done here as simplifying calls can move + # the position + continue + if call[1] == "snp" and call[2][0] == call[2][1]: + # Ref calls aren't interesting + continue + # Only tracking absolute number of reads + self.minor_populations.append( + ( + pos, + call[1], + call[2], + int(depth), + round(depth / total_depth, 3), + item["original_vcf_row"], + ) ) - ) def __find_calls(self): """ @@ -515,7 +516,7 @@ def __find_calls(self): Creates calls dict used elsewhere. """ - self.calls = {} + self.calls = defaultdict(list) for record in self.records: # VCF files are 1 indexed but keep for now @@ -590,10 +591,13 @@ def __find_calls(self): vcf_info["ALTS"] = record.alts metadata["original_vcf_row"] = vcf_info if self.calls.get((index + counter, variant_type)) is not None: - raise ValueError( - "Multiple calls at position " + str(index) + " in VCF file" + warnings.warn( + UserWarning( + f"Multiple calls at position {index}" + + f" with type {variant_type} in VCF file" + ) ) - self.calls[(index + counter, variant_type)] = metadata + self.calls[(index + counter, variant_type)].append(metadata) # otherwise the REF, ALT pair are different lengths else: @@ -613,12 +617,13 @@ def __find_calls(self): vcf_info["ALTS"] = record.alts metadata["original_vcf_row"] = vcf_info if self.calls.get((index + p, "indel")) is not None: - raise ValueError( - "Multiple calls at position " - + str(index) - + " in VCF file" + warnings.warn( + UserWarning( + f"Multiple calls at position {index}" + + " with type indel in VCF file" + ) ) - self.calls[(index + p, "indel")] = metadata + self.calls[(index + p, "indel")].append(metadata) else: metadata = {} @@ -631,12 +636,13 @@ def __find_calls(self): vcf_info["ALTS"] = record.alts metadata["original_vcf_row"] = vcf_info if self.calls.get((index + p, variant_type)) is not None: - raise ValueError( - "Multiple calls at position " - + str(index) - + " in VCF file" + warnings.warn( + UserWarning( + f"Multiple calls at position {index}" + + f" with type {variant_type} in VCF file" + ) ) - self.calls[(index + p, variant_type)] = metadata + self.calls[(index + p, variant_type)].append(metadata) def _simplify_call(self, ref: str, alt: str) -> List[Tuple[int, str, str]]: """Private method to simplify a complex call into one indel and multiple SNPs. @@ -808,70 +814,71 @@ def __get_variants(self): to_drop = [] for index, type_ in sorted(list(self.calls.keys())): - call = self.calls[(index, type_)]["call"] - alt = call - ref = self.calls[(index, type_)]["ref"] - if ref == alt and self.bypass_reference_calls: - # If we have a call at a position which the alt is a reference call - # we should only care if we haven't tried to bypass ref calls - # Have to check here too i.e CCC->ATC will have a ref call at pos 3 - to_drop.append((index, type_)) - continue - indices.append(index) - refs.append(ref) - pos = self.calls[(index, type_)]["pos"] - positions.append(pos) - # Update the masks with the appropriate types - if type_ == "indel": - # Convert to ins_x or del_x rather than tuple - variant = str(index) + "_" + call[0] + "_" + str(call[1]) - alt = call[1] - is_indel.append(True) - if call[0] == "ins": - indel_length.append(len(call[1])) - else: - indel_length.append(-1 * len(call[1])) - is_snp.append(False) - is_het.append(False) - is_null.append(False) - elif type_ == "snp": - variant = str(index) + ref + ">" + call - is_indel.append(False) - indel_length.append(0) - is_snp.append(True) - is_het.append(False) - is_null.append(False) - elif type_ == "het": - variant = str(index) + ref + ">" + call - is_indel.append(False) - indel_length.append(0) - is_snp.append(False) - is_het.append(True) - is_null.append(False) - elif type_ == "null": - variant = str(index) + ref + ">" + call - is_indel.append(False) - indel_length.append(0) - is_snp.append(False) - is_het.append(False) - is_null.append(True) - elif type_ == "ref": - variant = str(index) + ref + ">" + ref - is_indel.append(False) - indel_length.append(0) - is_snp.append(False) - is_het.append(False) - is_null.append(False) - alts.append(alt) - variants.append(variant) - for key in self.calls[(index, type_)]["original_vcf_row"]: - metadata[key].append( - self.calls[(index, type_)]["original_vcf_row"][key] - ) + for idx, item in enumerate(self.calls[(index, type_)]): + call = item["call"] + alt = call + ref = item["ref"] + if ref == alt and self.bypass_reference_calls: + # If we have a call at a position which the alt is a reference call + # we should only care if we haven't tried to bypass ref calls + # Have to check here too i.e CCC->ATC will have a ref call at pos 3 + to_drop.append((idx, (index, type_))) + continue + indices.append(index) + refs.append(ref) + pos = item["pos"] + positions.append(pos) + # Update the masks with the appropriate types + if type_ == "indel": + # Convert to ins_x or del_x rather than tuple + variant = str(index) + "_" + call[0] + "_" + str(call[1]) + alt = call[1] + is_indel.append(True) + if call[0] == "ins": + indel_length.append(len(call[1])) + else: + indel_length.append(-1 * len(call[1])) + is_snp.append(False) + is_het.append(False) + is_null.append(False) + elif type_ == "snp": + variant = str(index) + ref + ">" + call + is_indel.append(False) + indel_length.append(0) + is_snp.append(True) + is_het.append(False) + is_null.append(False) + elif type_ == "het": + variant = str(index) + ref + ">" + call + is_indel.append(False) + indel_length.append(0) + is_snp.append(False) + is_het.append(True) + is_null.append(False) + elif type_ == "null": + variant = str(index) + ref + ">" + call + is_indel.append(False) + indel_length.append(0) + is_snp.append(False) + is_het.append(False) + is_null.append(True) + elif type_ == "ref": + variant = str(index) + ref + ">" + ref + is_indel.append(False) + indel_length.append(0) + is_snp.append(False) + is_het.append(False) + is_null.append(False) + alts.append(alt) + variants.append(variant) + for key in item["original_vcf_row"]: + metadata[key].append(item["original_vcf_row"][key]) # Remove ref calls as required - for key in to_drop: - del self.calls[key] + for idx, key in to_drop: + del self.calls[key][idx] + if len(self.calls[key]) == 0: + del self.calls[key] # Convert to numpy arrays for neat indexing self.alt_nucleotides = numpy.array(alts) diff --git a/tests/unit/test_functions.py b/tests/unit/test_functions.py index 561e3cf..e8c4ae6 100644 --- a/tests/unit/test_functions.py +++ b/tests/unit/test_functions.py @@ -1612,8 +1612,8 @@ def test_large_deletions(): diff = sample - sample2 - c1 = {key[0]: vcf.calls[key]["original_vcf_row"] for key in vcf.calls.keys()} - c2 = {key[0]: vcf2.calls[key]["original_vcf_row"] for key in vcf2.calls.keys()} + c1 = {key[0]: vcf.calls[key][0]["original_vcf_row"] for key in vcf.calls.keys()} + c2 = {key[0]: vcf2.calls[key][0]["original_vcf_row"] for key in vcf2.calls.keys()} for idx, evidence in zip(diff.nucleotide_index, diff.vcf_evidences): if idx in c1.keys() and idx in c2.keys(): diff --git a/tests/unit/test_instantiation.py b/tests/unit/test_instantiation.py index e13042a..b113753 100644 --- a/tests/unit/test_instantiation.py +++ b/tests/unit/test_instantiation.py @@ -2489,7 +2489,7 @@ def test_instanciate_vcf(): } # could use assertDictEqual from unittest framework, but not using at present - assert vcf.calls == calls + assert {key: vcf.calls[key][0] for key in vcf.calls.keys()} == calls # Testing record objects # Features common to all record objects: diff --git a/tests/unit/test_minor_populations.py b/tests/unit/test_minor_populations.py index f4cd2a7..4fabd2e 100644 --- a/tests/unit/test_minor_populations.py +++ b/tests/unit/test_minor_populations.py @@ -30,13 +30,26 @@ def make_reproducable(list_: [str]) -> [str]: def test_double_del(): - """Testing for expected crashes with deletions at the same base""" - with pytest.raises(ValueError): - gumpy.VCFFile( + """Testing for picking up deletions at the same base""" + ref = gumpy.Genome("config/TEST-DNA.gbk") + with pytest.warns(UserWarning): + vcf = gumpy.VCFFile( "tests/test-cases/TEST-DNA-double-del.vcf", ignore_filter=True, minor_population_indices={3, 4, 5, 6, 7}, ) + sample = ref + vcf + + ref_a = ref.build_gene("A") + sample_a = sample.build_gene("A") + + diff = ref_a - sample_a + assert sorted(diff.minor_populations()) == sorted( + ["1_del_aa:3", "2_indel:3", "3_del_a:3"] + ) + assert sorted(sample_a.minority_populations_GARC()) == sorted( + diff.minor_populations() + ) @pytest.mark.slow