+
1 #!/usr/bin/env python3
+
+
2
+
+
3 """Module containing the HelParTimeSeries class and the command line interface."""
+
+
4
+
+
5 import argparse
+
+
6 import re
+
+
7 import zipfile
+
+
8 from pathlib import Path
+
+
9 from typing import Optional
+
+
10
+
+
11 import matplotlib.pyplot as plt
+
+
12 import pandas as pd
+
+
13 from biobb_common.configuration import settings
+
+
14 from biobb_common.generic.biobb_object import BiobbObject
+
+
15 from biobb_common.tools import file_utils as fu
+
+
16 from biobb_common.tools.file_utils import launchlogger
+
+
17
+
+
18 from biobb_dna.utils import constants
+
+
19 from biobb_dna.utils.common import _from_string_to_list
+
+
20 from biobb_dna.utils.loader import read_series
+
+
21
+
+
22
+
+
23 class HelParTimeSeries(BiobbObject):
+
+
24 """
+
+
25 | biobb_dna HelParTimeSeries
+
+
26 | Created time series and histogram plots for each base pair from a helical parameter series file.
+
+
27 | The helical parameter series file is expected to be a table, with the first column being an index and the rest the helical parameter values for each base/basepair.
+
+
28
+
+
29 Args:
+
+
30 input_ser_path (str): Path to .ser file for helical parameter. File is expected to be a table, with the first column being an index and the rest the helical parameter values for each base/basepair. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/dna/canal_output_shift.ser>`_. Accepted formats: ser (edam:format_2330).
+
+
31 output_zip_path (str): Path to output .zip files where data is saved. File type: output. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/reference/dna/timeseries_output.zip>`_. Accepted formats: zip (edam:format_3987).
+
+
32 properties (dict):
+
+
33 * **sequence** (*str*) - (None) Nucleic acid sequence corresponding to the input .ser file. Length of sequence is expected to be the same as the total number of columns in the .ser file, minus the index column (even if later on a subset of columns is selected with the *usecols* option).
+
+
34 * **bins** (*int*) - (None) Bins for histogram. Parameter has same options as matplotlib.pyplot.hist.
+
+
35 * **helpar_name** (*str*) - (None) Helical parameter name. It must match the name of the helical parameter in the .ser input file. Values: majd, majw, mind, minw, inclin, tip, xdisp, ydisp, shear, stretch, stagger, buckle, propel, opening, rise, roll, twist, shift, slide, tilt, alphaC, alphaW, betaC, betaW, gammaC, gammaW, deltaC, deltaW, epsilC, epsilW, zetaC, zetaW, chiC, chiW, phaseC, phaseW.
+
+
36 * **stride** (*int*) - (1000) granularity of the number of snapshots for plotting time series.
+
+
37 * **seqpos** (*list*) - (None) list of sequence positions (columns indices starting by 1) to analyze. If not specified it will analyse the complete sequence.
+
+
38 * **remove_tmp** (*bool*) - (True) [WF property] Remove temporal files.
+
+
39 * **restart** (*bool*) - (False) [WF property] Do not execute if output files exist.
+
+
40 * **sandbox_path** (*str*) - ("./") [WF property] Parent path to the sandbox directory.
+
+
41
+
+
42 Examples:
+
+
43 This is a use example of how to use the building block from Python::
+
+
44
+
+
45 from biobb_dna.dna.dna_timeseries import dna_timeseries
+
+
46
+
+
47 prop = {
+
+
48 'helpar_name': 'twist',
+
+
49 'seqpos': [1,2,3,4,5],
+
+
50 'sequence': 'GCAACGTGCTATGGAAGC',
+
+
51 }
+
+
52 dna_timeseries(
+
+
53 input_ser_path='/path/to/twist.ser',
+
+
54 output_zip_path='/path/to/output/file.zip'
+
+
55 properties=prop)
+
+
56 Info:
+
+
57 * wrapped_software:
+
+
58 * name: In house
+
+
59 * license: Apache-2.0
+
+
60 * ontology:
+
+
61 * name: EDAM
+
+
62 * schema: http://edamontology.org/EDAM.owl
+
+
63
+
+
64 """
+
+
65
+
+
66 def __init__(
+
+
67 self, input_ser_path, output_zip_path, properties=None, **kwargs
+
+
68 ) -> None:
+
+
69 properties = properties or {}
+
+
70
+
+
71 # Call parent class constructor
+
+
72 super().__init__(properties)
+
+
73 self.locals_var_dict = locals().copy()
+
+
74
+
+
75 # Input/Output files
+
+
76 self.io_dict = {
+
+
77 "in": {
+
+
78 "input_ser_path": input_ser_path,
+
+
79 },
+
+
80 "out": {"output_zip_path": output_zip_path},
+
+
81 }
+
+
82
+
+
83 self.properties = properties
+
+
84 self.sequence = properties.get("sequence", None)
+
+
85 self.bins = properties.get("bins", "auto")
+
+
86 self.stride = properties.get("stride", 10)
+
+
87 self.seqpos = [
+
+
88 int(elem) for elem in _from_string_to_list(properties.get("seqpos", None))
+
+
89 ]
+
+
90 self.helpar_name = properties.get("helpar_name", None)
+
+
91
+
+
92 # get helical parameter from filename if not specified
+
+
93 if self.helpar_name is None:
+
+
94 for hp in constants.helical_parameters:
+
+
95 if hp.lower() in Path(input_ser_path).name.lower():
+
+
96 self.helpar_name = hp
+
+
97 if self.helpar_name is None:
+
+
98 raise ValueError(
+
+
99 "Helical parameter name can't be inferred from file, "
+
+
100 "so it must be specified!"
+
+
101 )
+
+
102 else:
+
+
103 if self.helpar_name not in constants.helical_parameters:
+
+
104 raise ValueError(
+
+
105 "Helical parameter name is invalid! "
+
+
106 f"Options: {constants.helical_parameters}"
+
+
107 )
+
+
108
+
+
109 # get base length from helical parameter name
+
+
110 if self.helpar_name.lower() in constants.hp_singlebases:
+
+
111 self.baselen = 0
+
+
112 else:
+
+
113 self.baselen = 1
+
+
114 # get unit from helical parameter name
+
+
115 if self.helpar_name in constants.hp_angular:
+
+
116 self.hp_unit = "Degrees"
+
+
117 else:
+
+
118 self.hp_unit = "Angstroms"
+
+
119
+
+
120 # Check the properties
+
+
121 self.check_properties(properties)
+
+
122 self.check_arguments()
+
+
123
+
+
124 @launchlogger
+
+
125 def launch(self) -> int:
+
+
126 """Execute the :class:`HelParTimeSeries <dna.dna_timeseries.HelParTimeSeries>` object."""
+
+
127
+
+
128 # Setup Biobb
+
+
129 if self.check_restart():
+
+
130 return 0
+
+
131 self.stage_files()
+
+
132
+
+
133 # check sequence
+
+
134 if self.sequence is None or len(self.sequence) < 2:
+
+
135 raise ValueError("sequence is null or too short!")
+
+
136
+
+
137 # calculate cols with 0 index
+
+
138 if self.seqpos:
+
+
139 cols = [i - 1 for i in self.seqpos]
+
+
140 else:
+
+
141 cols = list(range(len(self.sequence)))
+
+
142
+
+
143 # sort cols in ascending order
+
+
144 cols.sort()
+
+
145
+
+
146 # check seqpos for base pairs
+
+
147 if self.seqpos and self.helpar_name in constants.hp_basepairs:
+
+
148 if (max(cols) > len(self.sequence) - 2) or (min(cols) < 0):
+
+
149 raise ValueError(
+
+
150 f"seqpos values must be between 1 and {len(self.sequence) - 1}"
+
+
151 )
+
+
152 if not (isinstance(self.seqpos, list) and len(self.seqpos) > 1):
+
+
153 raise ValueError("seqpos must be a list of at least two integers")
+
+
154 # check seqpos for non base pairs
+
+
155 elif self.seqpos and self.helpar_name not in constants.hp_basepairs:
+
+
156 if (max(cols) > len(self.sequence) - 1) or (min(cols) < 0):
+
+
157 raise ValueError(
+
+
158 f"seqpos values must be between 1 and {len(self.sequence)}"
+
+
159 )
+
+
160 if not (isinstance(self.seqpos, list) and len(self.seqpos) > 1):
+
+
161 raise ValueError("seqpos must be a list of at least two integers")
+
+
162
+
+
163 if self.helpar_name in constants.hp_basepairs:
+
+
164 # remove first and last base pairs from cols if they match 0 and len(sequence)
+
+
165 if min(cols) == 0:
+
+
166 cols.pop(0)
+
+
167 if max(cols) == len(self.sequence) - 1:
+
+
168 cols.pop(-1)
+
+
169
+
+
170 # discard first and last base(pairs) from sequence
+
+
171 sequence = self.sequence[1:-1]
+
+
172 # create indices list
+
+
173 indices = cols.copy()
+
+
174 # create subunits list from cols
+
+
175 subunits = [f"{i+1}_{sequence[i-1:i+self.baselen]}" for i in cols]
+
+
176 # clean subunits (leave only basepairs)
+
+
177 pattern = re.compile(r"\d+_[A-Za-z]{2}")
+
+
178 # get removed items
+
+
179 removed_items = [s for s in subunits if not pattern.fullmatch(s)]
+
+
180 # get indices of removed items (in integer format and starting from 0)
+
+
181 removed_numbers = [
+
+
182 int(match.group())
+
+
183 for item in removed_items
+
+
184 if (match := re.match(r"\d+", item))
+
+
185 ]
+
+
186 removed_numbers = list(map(int, removed_numbers))
+
+
187 removed_numbers = [int(i) - 1 for i in removed_numbers]
+
+
188 # remove non basepairs from subunits and indices
+
+
189 subunits = [s for s in subunits if pattern.fullmatch(s)]
+
+
190 indices = [i for i in indices if i not in removed_numbers]
+
+
191 else:
+
+
192 sequence = self.sequence
+
+
193 # create indices list
+
+
194 indices = cols.copy()
+
+
195 # trick for getting the index column from the .ser file
+
+
196 indices.insert(0, 0)
+
+
197 # create subunits list from cols
+
+
198 subunits = [f"{i+1}_{sequence[i:i+1+self.baselen]}" for i in cols]
+
+
199
+
+
200 # read input .ser file
+
+
201 ser_data = read_series(
+
+
202 self.stage_io_dict["in"]["input_ser_path"], usecols=indices
+
+
203 )
+
+
204
+
+
205 # get columns for selected bases
+
+
206 ser_data.columns = subunits
+
+
207
+
+
208 # write output files for all selected bases (one per column)
+
+
209 zf = zipfile.ZipFile(Path(self.stage_io_dict["out"]["output_zip_path"]), "w")
+
+
210 for col in ser_data.columns:
+
+
211 # unstack columns to prevent errors from repeated base pairs
+
+
212 column_data = ser_data[[col]].unstack().dropna().reset_index(drop=True)
+
+
213 column_data.name = col
+
+
214 fu.log(f"Computing base number {col}...")
+
+
215
+
+
216 # column series
+
+
217 series_colfn = f"series_{self.helpar_name}_{col}"
+
+
218 column_data.to_csv(
+
+
219 Path(self.stage_io_dict.get("unique_dir", "")) / f"{series_colfn}.csv"
+
+
220 )
+
+
221 # save table
+
+
222 zf.write(
+
+
223 Path(self.stage_io_dict.get("unique_dir", "")) / f"{series_colfn}.csv",
+
+
224 arcname=f"{series_colfn}.csv",
+
+
225 )
+
+
226
+
+
227 fig, axs = plt.subplots(1, 1, dpi=300, tight_layout=True)
+
+
228 reduced_data = column_data.iloc[:: self.stride]
+
+
229 axs.plot(reduced_data.index, reduced_data.to_numpy())
+
+
230 axs.set_xlabel("Time (Snapshots)")
+
+
231 axs.set_ylabel(f"{self.helpar_name.capitalize()} ({self.hp_unit})")
+
+
232 axs.set_title(
+
+
233 f"Helical Parameter vs Time: {self.helpar_name.capitalize()} "
+
+
234 "(base pair "
+
+
+
+
+ -
+
+ E225
+
+ Missing whitespace around operator
+
+
235 f"{'step' if self.baselen==1 else ''} {col})"
+
+
236 )
+
+
237 fig.savefig(
+
+
238 Path(self.stage_io_dict.get("unique_dir", "")) / f"{series_colfn}.jpg",
+
+
239 format="jpg",
+
+
240 )
+
+
241 # save plot
+
+
242 zf.write(
+
+
243 Path(self.stage_io_dict.get("unique_dir", "")) / f"{series_colfn}.jpg",
+
+
244 arcname=f"{series_colfn}.jpg",
+
+
245 )
+
+
246 plt.close()
+
+
247
+
+
248 # columns histogram
+
+
249 hist_colfn = f"hist_{self.helpar_name}_{col}"
+
+
250 fig, axs = plt.subplots(1, 1, dpi=300, tight_layout=True)
+
+
251 ybins, x, _ = axs.hist(column_data, bins=self.bins)
+
+
252 pd.DataFrame({self.helpar_name: x[:-1], "density": ybins}).to_csv(
+
+
253 Path(self.stage_io_dict.get("unique_dir", "")) / f"{hist_colfn}.csv",
+
+
254 index=False,
+
+
255 )
+
+
256 # save table
+
+
257 zf.write(
+
+
258 Path(self.stage_io_dict.get("unique_dir", "")) / f"{hist_colfn}.csv",
+
+
259 arcname=f"{hist_colfn}.csv",
+
+
260 )
+
+
261
+
+
262 axs.set_ylabel("Density")
+
+
263 axs.set_xlabel(f"{self.helpar_name.capitalize()} ({self.hp_unit})")
+
+
264 fig.savefig(
+
+
265 Path(self.stage_io_dict.get("unique_dir", "")) / f"{hist_colfn}.jpg",
+
+
266 format="jpg",
+
+
267 )
+
+
268 # save plot
+
+
269 zf.write(
+
+
270 Path(self.stage_io_dict.get("unique_dir", "")) / f"{hist_colfn}.jpg",
+
+
271 arcname=f"{hist_colfn}.jpg",
+
+
272 )
+
+
273 plt.close()
+
+
274 zf.close()
+
+
275
+
+
276 # Copy files to host
+
+
277 self.copy_to_host()
+
+
278
+
+
279 # Remove temporary file(s)
+
+
280 self.tmp_files.extend([self.stage_io_dict.get("unique_dir", "")])
+
+
281 self.remove_tmp_files()
+
+
282
+
+
283 self.check_arguments(output_files_created=True, raise_exception=False)
+
+
284
+
+
285 return 0
+
+
286
+
+
287
+
+
288 def dna_timeseries(
+
+
289 input_ser_path: str,
+
+
290 output_zip_path: str,
+
+
291 properties: Optional[dict] = None,
+
+
292 **kwargs,
+
+
293 ) -> int:
+
+
294 """Create :class:`HelParTimeSeries <dna.dna_timeseries.HelParTimeSeries>` class and
+
+
295 execute the :meth:`launch() <dna.dna_timeseries.HelParTimeSeries.launch>` method."""
+
+
296
+
+
297 return HelParTimeSeries(
+
+
298 input_ser_path=input_ser_path,
+
+
299 output_zip_path=output_zip_path,
+
+
300 properties=properties,
+
+
301 **kwargs,
+
+
302 ).launch()
+
+
303
+
+
304
+
+
305 def main():
+
+
306 """Command line execution of this building block. Please check the command line documentation."""
+
+
307 parser = argparse.ArgumentParser(
+
+
308 description="Created time series and histogram plots for each base pair from a helical parameter series file.",
+
+
309 formatter_class=lambda prog: argparse.RawTextHelpFormatter(prog, width=99999),
+
+
310 )
+
+
311 parser.add_argument("--config", required=False, help="Configuration file")
+
+
312
+
+
313 required_args = parser.add_argument_group("required arguments")
+
+
314 required_args.add_argument(
+
+
315 "--input_ser_path",
+
+
316 required=True,
+
+
317 help="Helical parameter input ser file path. Accepted formats: ser.",
+
+
318 )
+
+
319 required_args.add_argument(
+
+
320 "--output_zip_path", required=True, help="Path to output directory."
+
+
321 )
+
+
322
+
+
323 args = parser.parse_args()
+
+
324 args.config = args.config or "{}"
+
+
325 properties = settings.ConfReader(config=args.config).get_prop_dic()
+
+
326
+
+
327 dna_timeseries(
+
+
328 input_ser_path=args.input_ser_path,
+
+
329 output_zip_path=args.output_zip_path,
+
+
330 properties=properties,
+
+
331 )
+
+
332
+
+
333
+
+
334 if __name__ == "__main__":
+
+
335 main()
+
+
+