From a583026c38246322ddade9384b6ff7a091f897b8 Mon Sep 17 00:00:00 2001 From: Dan Fornika Date: Wed, 12 Jun 2024 09:39:23 -0700 Subject: [PATCH] create gh-pages branch for docs --- README.md | 210 ----- environment.yaml | 16 - fluviewer/__init__.py | 1 - fluviewer/analysis.py | 1621 ----------------------------------- fluviewer/cli_args.py | 98 --- fluviewer/database.py | 118 --- fluviewer/fluviewer.py | 532 ------------ fluviewer/logging_config.py | 33 - fluviewer/parsers.py | 242 ------ fluviewer/plots.py | 122 --- fluviewer/report.py | 134 --- setup.py | 28 - 12 files changed, 3155 deletions(-) delete mode 100644 README.md delete mode 100644 environment.yaml delete mode 100644 fluviewer/__init__.py delete mode 100644 fluviewer/analysis.py delete mode 100644 fluviewer/cli_args.py delete mode 100644 fluviewer/database.py delete mode 100644 fluviewer/fluviewer.py delete mode 100644 fluviewer/logging_config.py delete mode 100644 fluviewer/parsers.py delete mode 100644 fluviewer/plots.py delete mode 100644 fluviewer/report.py delete mode 100644 setup.py diff --git a/README.md b/README.md deleted file mode 100644 index 4c1f585..0000000 --- a/README.md +++ /dev/null @@ -1,210 +0,0 @@ -[![Consistency Check](https://github.com/BCCDC-PHL/FluViewer/actions/workflows/consistency_check.yml/badge.svg)](https://github.com/BCCDC-PHL/FluViewer/actions/workflows/consistency_check.yml) - -# BCCDC-PHL/FluViewer - -FluViewer is an automated pipeline for generating influenza A virus (IAV) genome sequences from FASTQ data. If provided with a sufficiently diverse and representative database of IAV reference sequences, it can generate sequences regardless of host and subtype without any human intervention required. - -This codebase is derived from [KevinKuchinski/FluViewer](https://github.com/KevinKuchinski/FluViewer), with modifications applied to meet operational needs of BCCDC-PHL. - -## Analysis Stages - -0. **Read Normalization**: The provided reads are normalized and downsampled using a kmer-based approach from [bbmap](https://sourceforge.net/projects/bbmap) called `bbnorm`. This reduces any excessive coverage of certain genome regions. - -1. **Assemble Contigs**: The normalized/downsampled reads are assembled de novo into contigs with the [spades](https://github.com/ablab/spades) assembler. - -2. **BLAST Contigs**: The contigs are then aligned to a database of IAV reference sequences using [BLAST](http://blast.ncbi.nlm.nih.gov/Blast.cgi?PAGE_TYPE=BlastDocs). -These alignments are used to trim contigs and roughly position them within their respective genome segment. - -3. **Scaffolding**: A multiple sequencing alignment is conducted on the trimmed/positioned contigs using [clustalw](http://www.clustal.org), generating scaffold sequences for each IAV genome segment. -These scaffolds are aligned to the IAV reference sequence database to find their best matches. -These best matches are used to fill in any missing regions in the scaffold, creating mapping references. - -4. **Read Mapping**: The normalized/downsampled reads are mapped to these mapping references using [bwa](https://github.com/lh3/bwa). - -5. **Variant Calling**: Variants are called from the alignmnents generated by the read mapping stage using [freebayes](https://github.com/freebayes/freebayes) - -6. **Consensus Calling**: Consensus sequences are generated using [bcftools](https://samtools.github.io/bcftools/bcftools.html) based on the variants that are detected between the best available reference and the mapped reads. - -7. **Summary Reporting**: Summary reports and plots are generated. - -### Analysis Summaries - -Information is passed between analysis stage via 'analysis summaries'. These are implemented as python dictionaries, with the following structure: - -```json -{ - "process_name": "bbnorm", - "timestamp_analysis_start": "2024-06-11T11:28:37.142158", - "timestamp_analysis_complete": "2024-06-11T11:29:03.860329", - "return_code": 0, - "inputs": { - "input_reads_fwd": "/path/to/sample-01_R1.fastq.gz", - "input_reads_rev": "/path/to/sample-01_R2.fastq.gz" - }, - "outputs": { - "normalized_reads_fwd": "/path/to/outdir/analysis_by_stage/00_normalize_depth/sample-01-normalized_R1.fastq.gz", - "normalized_reads_rev": "/path/to/outdir/analysis_by_stage/00_normalize_depth/sample-01-normalized_R2.fastq.gz", - } -} -``` - -Each stage selects its inputs from the `outputs` of the previous stages's analysis summary. - -### Analysis Stage Diagram - -```mermaid -flowchart TD - forward_reads[Forward Reads] -- input_reads_fwd --> normalization(Read Normalization) - reverse_reads[Reverse Reads] -- input_reads_rev --> normalization - normalization -- normalized_reads_fwd --> assemble_contigs(Assemble Contigs) - normalization -- normalized_reads_rev --> assemble_contigs - fluviewer_db[FluViewer DB] --> blast_contigs(BLAST Contigs) - assemble_contigs -- contigs --> blast_contigs - blast_contigs -- filtered_contig_blast_results --> scaffolding(Scaffolding) - fluviewer_db --> scaffolding - scaffolding -- filtered_scaffold_blast_results --> read_mapping(Read Mapping) - normalization -- normalized_reads_fwd --> read_mapping - normalization -- normalized_reads_rev --> read_mapping - fluviewer_db --> read_mapping - read_mapping -- mapping_refs --> variant_calling(Variant Calling) - read_mapping -- alignment --> variant_calling - read_mapping -- alignment_index --> variant_calling - variant_calling -- variants_filtered --> consensus_calling(Consensus Calling) - variant_calling -- masked_positions --> consensus_calling - read_mapping -- mapping_refs --> consensus_calling - scaffolding -- scaffolds --> reporting(Summary Reporting) - read_mapping -- mapping_refs --> reporting - read_mapping -- alignment --> reporting - variant_calling -- depth_of_cov_freebayes --> reporting - variant_calling -- low_coverage_positions --> reporting - variant_calling -- ambiguous_positions --> reporting - variant_calling -- variant_positions --> reporting - consensus_calling -- consensus_seqs --> reporting -``` - -## Installation -1. Create a virtual environment and install the necessary dependencies using the YAML file provided in this repository. For example, if using conda: - -``` -conda create -n FluViewer -f environment.yaml -``` - -2. Activate the FluViewer environment created in the previous step. For example, if using conda: - -``` -conda activate FluViewer -``` - -3. Install the latest version of FluViewer from this repo. - -``` -pip3 install git+https://github.com/BCCDC-PHL/FluViewer.git -``` - -4. Download and unzip the default FluViewer DB (FluViewer_db.fa.gz) provided in [the BCCDC-PHL/FluViewer-db](https://github.com/BCCDC-PHL/FluViewer-db) repository. Custom DBs can be created and used as well (instructions below). - -## Usage - -``` -usage: fluviewer [-h] -f FORWARD_READS -r REVERSE_READS -d DATABASE [-o OUTDIR] -n OUTPUT_NAME [-i [0-100]] [-l [32-]] [-D [1-]] [-q [0-]] [-v [0-1]] [-V [0-1]] [-N [1-]] [-L [1-]] - [-t [1-]] [-M [1-]] [-g] [--log-level {info,debug}] - -BCCDC-PHL/FluViewer: Influenza A virus consensus sequence generation and variant calling - -optional arguments: - -h, --help show this help message and exit - -f FORWARD_READS, --forward-reads FORWARD_READS - Path to FASTQ file containing forward reads - -r REVERSE_READS, --reverse-reads REVERSE_READS - Path to FASTQ file containing reverse reads - -d DATABASE, --db DATABASE - Path to FASTA file containing FluViewer database - -o OUTDIR, --outdir OUTDIR - Output directory (default=FluViewer_) - -n OUTPUT_NAME, --output-name OUTPUT_NAME - Output name. Includes this name in output files, and in consensus sequence headers - -i [0-100], --min-identity [0-100] - Minimum percent sequence identity between database reference sequences and contigs (default=90) - -l [32-], --min-alignment-length [32-] - Minimum length of alignment between database reference sequences and contigs (default=50) - -D [1-], --min-depth [1-] - Minimum read depth for base calling (default=20) - -q [0-], --min-mapping-quality [0-] - Minimum PHRED score for mapping quality and base quality during variant calling (default=20) - -v [0-1], --variant-threshold-calling [0-1] - Variant allele fraction threshold for calling variants (default=0.75) - -V [0-1], --variant-threshold-masking [0-1] - Variant allele fraction threshold for masking ambiguous variants (default=0.25) - -N [1-], --target-depth [1-] - Target depth for pre-normalization of reads (default=200) - -L [1-], --coverage-limit [1-] - Coverage depth limit for variant calling (default=200) - -t [1-], --threads [1-] - Threads used for contig/scaffold alignments (default=1) - -M [1-], --max-memory [1-] - Gigabytes of memory allocated for normalizing reads (default=max) - -g, --disable-garbage-collection - Disable garbage collection and retain intermediate analysis files - --log-level {info,debug} - Log level (default=info) -``` - - -## FluViewer Database - -FluViewer requires a curated FASTA file "database" of IAV reference sequences. Headers for these sequences must be formatted and annotated as follows: -``` ->unique_id|strain_name(strain_subtype)|sequence_segment|sequence_subtype -``` -Here are some example entries: -``` ->CY230322|A/Washington/32/2017(H3N2)|PB2|none -TCAATTATATTCAGCATGGAAAGAATAAAAGAACTACGGAATCTAATGTCGCAGTCTCGCACTCGCGA... - ->JX309816|A/Singapore/TT454/2010(H1N1)|HA|H1 -CAAAAGCAACAAAAATGAAGGCAATACTAGTAGTTCTGCTATATACATTTACAACCGCAAATGCAGACA... - ->MH669720|A/Iowa/52/2018(H3N2)|NA|N2 -AGGAAAGATGAATCCAAATCAAAAGATAATAACGATTGGCTCTGTTTCTCTCACCATTTCCACAATATG... -``` -For HA and NA segments, strain_subtype should reflect the HA and NA subtypes of the isolate (eg H1N1), but sequence_subtype should only indicate the HA or NA subtype of the segment sequence of the entry (eg H1 for an HA sequence or N1 for an NA sequence). - -For internal segments (i.e. PB2, PB1, PA, NP, M, and NS), strain_subtype should reflect the HA/NA subtypes of the isolate, but 'none' should be entered for sequence_subtype. If strain_subtype is unknown, 'none' should be entered there as well. - -FluViewer will only accept reference sequences composed entirely of uppercase canonical nucleotides (i.e. A, T, G, and C). - -During analysis, FluViewer will check if a BLAST database has been built based on the fasta file that is supplied with the `-d` (or `--db`) flag, by looking for the `.nhr`, `.nin` and `.nsq` BLAST database files associated with the fasta database. If any of those files are not found, the BLAST database will be built using `makeblastdb`. FluViewer expects that it will be able to write those files alongside the fasta database when this occurs. - -## FluViewer Output - -FluViewer generates four main output files for each library: - -1. A FASTA file containing consensus sequences for the IAV genome segments: `_consensus_seqs.fa` -2. A sorted BAM file with reads mapped to the mapping references generated for that library: `_alignment.bam` and `_alignment.bam.bai`. The mapping reference is also retained: `_mapping_refs.fa` -3. A report TSV file describing segment, subtype, and sequencing metrics for each consensus sequence generated: `_report.tsv` -4. Depth of coverage plots for each segment: `_depth_of_cov.png` - -Headers in the FASTA file have the following format: -``` ->output_name|segment|subject -``` - - -The report TSV files contain the following columns: - -- `seq_name` : the name of the consensus sequence described by this row -- `segment` : IAV genome segment (PB2, PB1, PA, HA, NP, NA, M, NS) -- `subtype` : HA or NA subtype ("none" for internal segments) -- `reads_mapped` : the number of sequencing reads mapped to this segment (post-normalization/downsampling) -- `seq_length` : the length (in nucleotides) of the consensus sequence generated by FluViewer -- `scaffold_completeness` : the number of nucleotide positions in the scaffold that were assembled from the provided reads (post-normalization/downsampling) -- `consensus_completeness` : the number of nucleotide positions in the consensus with a succesful base call (e.g. A, T, G, or C) -- `ref_seq_used` : the unique ID and strain name of the scaffold's best-matching reference sequence used for filling in missing regions in the scaffold (if the scaffold completeness was 100%, then this is provided pro forma as none of it was used to create the mapping reference) - - -The depth of coverage plots contains the following elements: -- A black line indicating the depth of coverage pre-variant calling -- A grey line indicating the depth of coverage post-variant calling -- Red shading covering positions where coverage was too low for base calling -- Orange lines indicating positions where excess variation resulted in an ambiguous base call -- Blue lines indicating positions where a variant was called diff --git a/environment.yaml b/environment.yaml deleted file mode 100644 index 7f065ff..0000000 --- a/environment.yaml +++ /dev/null @@ -1,16 +0,0 @@ -name: fluviewer -channels: - - bioconda - - conda-forge - - defaults -dependencies: - - bbmap=39.01 - - bcftools=1.17 - - blast=2.14.1 - - bwa=0.7.17 - - samtools=1.17 - - spades=3.15.3 - - clustalw=2.1 - - freebayes=1.3.6 - - pandas=2.0.3 - - seaborn=0.12.2 diff --git a/fluviewer/__init__.py b/fluviewer/__init__.py deleted file mode 100644 index 18bc4da..0000000 --- a/fluviewer/__init__.py +++ /dev/null @@ -1 +0,0 @@ -__version__ = '0.1.11-1' diff --git a/fluviewer/analysis.py b/fluviewer/analysis.py deleted file mode 100644 index 8bcf245..0000000 --- a/fluviewer/analysis.py +++ /dev/null @@ -1,1621 +0,0 @@ -import datetime -import os -import json -import logging -import shutil -import subprocess -import sys - -from collections import Counter -from pathlib import Path -from typing import List - -import pandas as pd - -from fluviewer import parsers - - -log = logging.getLogger(__name__) - - -error_messages_by_code = { - 1: 'Error creating output directory.', - 2: 'Error running BBNorm.', - 3: 'Error running SPAdes.', - 4: 'No contigs assembled.', - 5: 'Error running BLASTn.', - 6: 'Error running BLASTn.', - 7: 'No contigs aligned to reference sequences.', - 8: 'Multiple subtypes detected for segment.', - 9: 'Error running ClustalW.', - 10: 'Error generating scaffold sequences.', - 11: 'Error running BLASTn.', - 12: 'No contigs aligned to reference sequences.', - 13: 'Multiple subtypes detected for segment.', - 14: 'Error running ClustalW.', - 15: 'Error generating scaffold sequences.', - 16: 'Error running BLASTn.', - 17: 'No contigs aligned to reference sequences.', - 18: 'Multiple subtypes detected for segment.', -} - - -def run(terminal_command, outdir, out_name, process_name, error_code): - """ - A generalized function for running subprocesses, logging their output, and - trapping erroneous exit statuses. - - :param terminal_command: The command to be run in the terminal. - :type terminal_command: str - :param outdir: Path to the output directory. - :type outdir: str - :param out_name: Name of the output directory. - :type out_name: str - :param process_name: Name of the process being run. - :type process_name: str - :param error_code: Exit status if the subprocess fails. - :type error_code: int - :return: Exit status of the subprocess. - :rtype: int - """ - logs_dir = os.path.join(outdir, 'logs') - os.makedirs(logs_dir, exist_ok=True) - stdout_file = os.path.join(logs_dir, f'{process_name}_stdout.txt') - stderr_file = os.path.join(logs_dir, f'{process_name}_stderr.txt') - - script_file = os.path.join(outdir, f'{process_name}_script.sh') - with open(script_file, 'w') as f: - f.write('#!/bin/bash\n\n') - f.write(terminal_command) - f.write('\n') - os.chmod(script_file, 0o755) - - complete_process = None - try: - with open(stdout_file, 'w') as stdout_file: - with open(stderr_file, 'w') as stderr_file: - complete_process = subprocess.run( - script_file, - stdout=stdout_file, - stderr=stderr_file, - shell=True - ) - except Exception as e: - log.error(f'Error running subprocess {process_name}: {e}') - return error_code - - return_code = complete_process.returncode - - if return_code != 0: - log.error(f'Subprocess {process_name} failed (Exit status: {return_code})') - return error_code - - return return_code - - -def normalize_depth( - inputs: dict, - outdir: Path, - out_name: str, - fwd_reads_raw: Path, - rev_reads_raw: Path, - depth: int, - max_memory: int, -): - """ - BBNorm is run on the input reads to downsample regions of deep coverage - (using a k-mer frequency approach). This balances coverage, increases - analysis speed, and limits the impacts of artefactual reads. - - :param inputs: Dictionary of input files, with keys 'raw_reads_fwd' and 'raw_reads_rev'. - :type inputs: dict - :param outdir: Path to the output directory. - :type outdir: str - :param out_name: Name of the output directory. - :type out_name: str - :param fwd_reads_raw: Path to the raw forward reads. - :type fwd_reads_raw: str - :param rev_reads_raw: Path to the raw reverse reads. - :type rev_reads_raw: str - :param depth: Target depth of coverage. - :type depth: int - :param max_memory: Maximum memory to allocate to BBNorm. - :type max_memory: int - :return: Summary of the analysis. - :rtype: dict - """ - timestamp_analysis_start = datetime.datetime.now().isoformat() - log.info('Normalizing depth of coverage and subsampling reads...') - logs_dir = os.path.join(outdir, 'logs') - os.makedirs(logs_dir, exist_ok=True) - - input_reads_fwd = inputs.get('input_reads_fwd', None) - input_reads_rev = inputs.get('input_reads_rev', None) - analysis_summary = { - 'timestamp_analysis_start': timestamp_analysis_start, - 'inputs': inputs, - } - - normalized_reads_fwd = os.path.join(outdir, f'{out_name}-normalized_R1.fastq') - normalized_reads_rev = os.path.join(outdir, f'{out_name}-normalized_R2.fastq') - - terminal_command = (f'bbnorm.sh in={input_reads_fwd} in2={input_reads_rev} ' - f'out={normalized_reads_fwd} out2={normalized_reads_rev} target={depth}') - terminal_command = (terminal_command + f' -Xmx{max_memory}g' - if max_memory is not None else terminal_command) - - # add gzip compression to output files - terminal_command += f'\n\ngzip -f {normalized_reads_fwd}\n' - terminal_command += f'\ngzip -f {normalized_reads_rev}\n' - - process_name = 'bbnorm' - error_code = 2 - - return_code = run(terminal_command, outdir, out_name, process_name, error_code) - if return_code != 0: - log.error(f'Error running BBNorm (Exit status: {return_code})') - analysis_summary['return_code'] = error_code - analysis_summary['error_message'] = error_messages_by_code[error_code] - return analysis_summary - - bbnorm_stderr_log_path = os.path.join(logs_dir, 'bbnorm_stderr.txt') - - parsed_bbnorm_log = parsers.parse_bbnorm_log(bbnorm_stderr_log_path) - - for pass_num, pass_stats in parsed_bbnorm_log.items(): - if not pass_num.startswith('pass_'): - continue - pass_num_int = int(pass_num.split('_')[-1]) - log.info(f'Normalization pass {pass_num_int}: Total reads in: {pass_stats["total_reads_in"]}') - log.info(f'Normalization pass {pass_num_int}: Percent reads kept: {pass_stats["total_reads_kept_percent"]}%') - log.info(f'Normalization pass {pass_num_int}: Percent unique: {pass_stats["percent_unique"]}%') - log.info(f'Normalization pass {pass_num_int}: Average depth (unique kmers): {pass_stats["depth_average_unique_kmers"]}') - log.info(f'Normalization pass {pass_num_int}: Average depth (all kmers): {pass_stats["depth_average_all_kmers"]}') - log.info(f'Normalization pass {pass_num_int}: Approx. median read depth: {pass_stats["approx_read_depth_median"]}') - - with open(os.path.join(logs_dir, 'bbnorm_log.json'), 'w') as f: - json.dump(parsed_bbnorm_log, f, indent=4) - f.write('\n') - - timestamp_analysis_complete = datetime.datetime.now().isoformat() - - outputs = { - 'normalized_reads_fwd': os.path.abspath(normalized_reads_fwd) + '.gz', - 'normalized_reads_rev': os.path.abspath(normalized_reads_rev) + '.gz', - } - - analysis_summary = { - 'process_name': process_name, - 'timestamp_analysis_start': timestamp_analysis_start, - 'timestamp_analysis_complete': timestamp_analysis_complete, - 'return_code': return_code, - 'inputs': inputs, - 'outputs': outputs, - } - - with open(os.path.join(outdir, 'analysis_summary.json'), 'w') as f: - json.dump(analysis_summary, f, indent=4) - f.write('\n') - - return analysis_summary - - -def assemble_contigs( - inputs: dict, - outdir: Path, - out_name: str, -): - """ - Normalized, downsampled reads are assembled de novo into contigs - using SPAdes. - - :param inputs: Dictionary of input files, with keys 'normalized_reads_fwd' and 'normalized_reads_rev'. - :type inputs: dict - :param outdir: Path to the output directory. - :type outdir: Path - :param out_name: Name used for outputs - :type out_name: str - :return: Summary of the analysis. - :rtype: dict - """ - log.info('Assembling reads into contigs...') - timestamp_analysis_start = datetime.datetime.now().isoformat() - - analysis_summary = { - 'timestamp_analysis_start': timestamp_analysis_start, - 'inputs': inputs, - } - - logs_dir = os.path.join(outdir, 'logs') - os.makedirs(logs_dir, exist_ok=True) - - spades_output = os.path.join(outdir, 'spades_output') - fwd_reads = inputs.get('normalized_reads_fwd', None) - rev_reads = inputs.get('normalized_reads_rev', None) - - os.makedirs(spades_output, exist_ok=True) - - terminal_command = (f'spades.py --rnaviral --isolate -1 {fwd_reads} ' - f'-2 {rev_reads} -o {spades_output}') - - process_name = 'spades' - error_code = 3 - - script_file = os.path.join(outdir, f'{process_name}_script.sh') - return_code = run(terminal_command, outdir, out_name, process_name, error_code) - analysis_summary['return_code'] = return_code - if not os.path.isfile(os.path.join(spades_output, 'contigs.fasta')): - log.error('No contigs assembled! Aborting analysis.') - error_code = 4 - analysis_summary['return_code'] = error_code - analysis_summary['error_message'] = error_messages_by_code[error_code] - analysis_summary['inputs'] = inputs - return analysis_summary - - num_contigs = 0 - src_contigs_path = os.path.join(spades_output, 'contigs.fasta') - dest_contigs_path = os.path.join(outdir, f'{out_name}_contigs.fasta') - shutil.copy(src_contigs_path, dest_contigs_path) - with open(dest_contigs_path, 'r') as f: - for line in f: - if line.startswith('>'): - num_contigs += 1 - log.info('Contigs assembled successfully.') - log.info(f'Assembled {num_contigs} contigs.') - - timestamp_analysis_complete = datetime.datetime.now().isoformat() - - outputs = { - 'contigs': os.path.abspath(dest_contigs_path), - } - - analysis_summary['process_name'] = process_name - analysis_summary['timestamp_analysis_complete'] = timestamp_analysis_complete - analysis_summary['return_code'] = return_code - analysis_summary['outputs'] = outputs - - with open(os.path.join(outdir, 'analysis_summary.json'), 'w') as f: - json.dump(analysis_summary, f, indent=4) - f.write('\n') - - return analysis_summary - - -def filter_contig_blast_results(blast_results, outdir, out_name, identity, length): - """ - Contigs alignments are filtered to discard spurious alignments. - - The length and sequence identity of each alignment must exceed certain - thresholds. - - Afterwards, a single reference sequence is selected for each - genome segment. The reference sequence with the most positions covered by - contigs. Ties are broken by: - - 1. The highest sequence identity - 2. The longest reference sequence length - 3. First alphabetically - - Once a reference sequence has been chosen for each segment, only contig alignments - to those reference sequences are retained. - - :param blast_results: BLASTn results. - :type blast_results: pd.DataFrame - :param out_name: Name used for outputs. - :type out_name: str - :param identity: Minimum sequence identity for BLASTn hits. - :type identity: float - :param length: Minimum alignment length for BLASTn hits. - :type length: int - :return: Filtered BLASTn results. - :rtype: pd.DataFrame - """ - log.info('Filtering contig BLAST alignments...') - - # - # Annotate each ref seq with its segment and subtype. - subject_annots = blast_results[['sseqid']].drop_duplicates() - - # sseqid format: accession|strain_name|segment|subtype - get_segment = lambda row: row['sseqid'].split('|')[2] - subject_annots['segment'] = subject_annots.apply(get_segment, axis=1) - - get_subtype = lambda row: row['sseqid'].split('|')[3] - subject_annots['subtype'] = subject_annots.apply(get_subtype, axis=1) - - # Merge segment and subtype annotations back into blast_results - blast_results = pd.merge(blast_results, subject_annots, on='sseqid') - - # Check for evidence of mixed infections. First, find best alignments(s) - # for each contig. Next, check if each segment is represented by only one subtype. - cols = ['qseqid', 'bitscore'] - # Find best alignment(s) for each contig, based on bitscore - max_bitscores = blast_results[cols].groupby('qseqid').max().reset_index() - # Use the qseqid and bitscore to find the best alignment(s) for each contig - best_results = pd.merge(blast_results, max_bitscores, on=cols) - - log.info('Checking for mixed infections...') - # Check for multiple subtypes for each segment - segments = blast_results['segment'].unique() - for segment in segments: - log.info(f'Checking segment {segment}...') - segment_results = best_results[best_results['segment']==segment] - log.info(f'Found {len(segment_results)} contigs for segment {segment}.') - segment_subtypes = segment_results['subtype'].unique() - if len(segment_subtypes) > 1: - segment_subtypes = ', '.join(segment_subtypes) - log.error(f'Multiple subtypes detected for segment {segment} ' - f'({segment_subtypes})! Aborting analysis.\n') - error_code = 8 - exit(error_code) - else: - if segment_subtypes[0] == 'none': - log.info(f'No subtype determined for segment {segment}.') - else: - log.info(f'Subtype determined for segment {segment}: {segment_subtypes[0]}.') - - # - # Find ref seq(s) most covered by contigs. - log.info('Finding reference sequences most covered by contigs...') - def count_cov_pos(data_frame): - """ - Count the number of positions covered by contigs. - Iterates through each alignment, and adds the range of positions - covered by the alignment to a set. The length of the set is the number - of covered positions. - - :param data_frame: DataFrame containing BLASTn results. - :type data_frame: pd.DataFrame - :return: Number of covered positions. - :rtype: int - """ - cov_positions = set() - for index, row in data_frame.iterrows(): - start = min([row['sstart'], row['send']]) - end = max([row['sstart'], row['send']]) - cov_positions = cov_positions.union(set(range(start, end + 1))) - return len(cov_positions) - - cols = [ - 'sseqid', - 'segment', - 'subtype', - 'sstart', - 'send' - ] - group_cols = [ - 'sseqid', - 'segment', - 'subtype' - ] - cov_pos = blast_results[cols].drop_duplicates() - - # Count the number of positions covered by contigs for each ref seq - cov_pos = cov_pos.groupby(group_cols).apply(count_cov_pos).reset_index() - log.info('Found covered positions for each reference sequence.') - - cov_pos.columns = [ - 'sseqid', - 'segment', - 'subtype', - 'covered_positions' - ] - cols = [ - 'segment', - 'subtype', - 'covered_positions' - ] - group_cols = ['segment', 'subtype'] - - # Find the ref seq(s) with the most covered positions for each segment/subtype - max_cov_pos = cov_pos[cols].drop_duplicates() - max_cov_pos = max_cov_pos.groupby(group_cols).max().reset_index() - log.info('Found reference sequence(s) with the most covered positions for each segment/subtype.') - merge_cols = ['segment', 'subtype', 'covered_positions'] - max_cov_pos = pd.merge(cov_pos, max_cov_pos, on=merge_cols) - - # Log a summary of covered positions for each segment/subtype - for segment in segments: - segment_results = max_cov_pos[max_cov_pos['segment']==segment] - subtype_to_log = '(undetermined)' - if segment_results['subtype'].values[0] != 'none': - subtype_to_log = segment_results['subtype'].values[0] - for subtype in segment_results['subtype'].unique(): - num_covered_positions = segment_results[segment_results['subtype']==subtype]['covered_positions'].values[0] - log.info(f'Segment: {segment}, Subtype: {subtype_to_log}: {num_covered_positions} covered positions.') - - max_cov_pos = max_cov_pos[['sseqid', 'covered_positions']] - log.info('Found reference sequence(s) with the most covered positions for each segment/subtype.') - blast_results = pd.merge(blast_results, max_cov_pos, on='sseqid') - num_blast_results = len(blast_results) - log.info(f'Found {num_blast_results} total contig alignments.') - - # - # Find remaining ref seq(s) with most identical positions. - def count_id_pos(data_frame): - """ - Count the number of identical positions between contigs and reference. - Determined by comparing the query and subject sequences from BLASTn. - Iterates through each base in the query sequence, and if the base is - a nucleotide and matches the subject sequence, the subject position is - added to a set. The length of the set is the number of identical - positions. - - :param data_frame: DataFrame containing BLASTn results. - :type data_frame: pd.DataFrame - :return: Number of identical positions. - :rtype: int - """ - identical_positions = set() - for index, row in data_frame.iterrows(): - start = min([row['sstart'], row['send']]) - increment = 1 if row['sstart'] <= row['send'] else -1 - subject_position = start - for qbase, sbase in zip(row['qseq'], row['sseq']): - if sbase in 'ATGC' and qbase == sbase: - identical_positions.add(subject_position) - if sbase != '-': - subject_position += increment - return len(identical_positions) - - cols = [ - 'sseqid', - 'segment', - 'subtype', - 'sstart', - 'send', - 'qseq', - 'sseq' - ] - group_cols = ['sseqid', 'segment', 'subtype'] - ident_pos = blast_results[cols].drop_duplicates() - # Count the number of identical positions for each ref seq - ident_pos = ident_pos.groupby(group_cols).apply(count_id_pos).reset_index() - ident_pos.columns = ['sseqid', 'segment', 'subtype', 'identical_positions'] - cols = ['segment', 'subtype', 'identical_positions'] - group_cols = ['segment', 'subtype'] - - # Find the ref seq(s) with the most identical positions for each segment/subtype - max_ident_pos = ident_pos[cols].drop_duplicates() - max_ident_pos = max_ident_pos.groupby(group_cols).max().reset_index() - merge_cols = ['segment', 'subtype', 'identical_positions'] - max_ident_pos = pd.merge(ident_pos, max_ident_pos, on=merge_cols) - log.info('Found reference sequence(s) with the most identical positions for each segment/subtype.') - for segment in segments: - segment_results = max_ident_pos[max_ident_pos['segment']==segment] - for subtype in segment_results['subtype'].unique(): - subtype_to_log = subtype if subtype != 'none' else 'undetermined' - num_identical_positions = segment_results[segment_results['subtype']==subtype]['identical_positions'].values[0] - log.info(f'Segment {segment}, Subtype {subtype_to_log}: {num_identical_positions} identical positions.') - - cols = ['sseqid', 'identical_positions'] - max_ident_pos = max_ident_pos[cols].drop_duplicates() - blast_results = pd.merge(blast_results, max_ident_pos, on='sseqid') - - # Take longest remaining ref seq for each segment/subtype. - cols = ['segment', 'subtype', 'slen'] - group_cols = ['segment', 'subtype'] - longest_sseq = blast_results[cols].drop_duplicates() - longest_sseq = longest_sseq.groupby(group_cols).min().reset_index() - blast_results = pd.merge(blast_results, longest_sseq, on=cols) - - # Take first alphabetical remaining ref seq for each segment/subtype. - cols = ['segment', 'subtype', 'sseqid'] - group_cols = ['segment', 'subtype'] - first_alpha_sseq = blast_results[cols].drop_duplicates() - first_alpha_sseq = first_alpha_sseq.groupby(group_cols).min().reset_index() - blast_results = pd.merge(blast_results, first_alpha_sseq, on=cols) - for segment in segments: - segment_results = blast_results[blast_results['segment']==segment] - for subtype in segment_results['subtype'].unique(): - subtype_to_log = subtype if subtype != 'none' else '(undetermined)' - ref_seq = segment_results[segment_results['subtype']==subtype]['sseqid'].values[0] - log.info(f'Segment {segment}, Subtype: {subtype_to_log}. Selected reference sequence: {ref_seq}') - - return blast_results - - -def blast_contigs(inputs, outdir, out_name, threads, identity, length): - """ - Contigs are aligned to reference sequences using BLASTn. - - :param inputs: Dictionary of input files, with keys 'contigs'. - :type inputs: dict - :param outdir: Path to the output directory. - :type outdir: Path - :param out_name: Name used for outputs. - :type out_name: str - :param threads: Number of threads to use for BLASTn. - :type threads: int - :param identity: Minimum sequence identity for BLASTn hits. - :type identity: float - :param length: Minimum alignment length for BLASTn hits. - :type length: int - :return: BLASTn results. - :rtype: pd.DataFrame - """ - log.info('BLASTing contigs to reference sequences...') - log_dir = os.path.join(outdir, 'logs') - os.makedirs(log_dir, exist_ok=True) - - timestamp_analysis_start = datetime.datetime.now().isoformat() - - analysis_summary = { - 'timestamp_analysis_start': timestamp_analysis_start, - 'inputs': inputs, - } - - db = os.path.abspath(inputs.get('database', None)) - blast_output_path = os.path.join(outdir, f'{out_name}_contigs_blast.tsv') - contigs = inputs.get('contigs', None) - - cols = [ - 'qseqid', - 'sseqid', - 'pident', - 'length', - 'bitscore', - 'sstart', - 'send', - 'qseq', - 'sseq', - 'slen', - ] - with open(blast_output_path, 'w') as f: - f.write('\t'.join(cols) + '\n') - - cols_str = ' '.join(cols) - - terminal_command = (f'blastn -query {contigs} -db {db} ' - f'-num_threads {threads} -outfmt "6 {cols_str}" ' - f'>> {blast_output_path}') - - process_name = 'blastn_contigs' - error_code = 6 - return_code = run(terminal_command, outdir, out_name, process_name, error_code) - - full_blast_results = pd.read_csv(blast_output_path, sep='\t') - - total_num_blast_results = len(full_blast_results) - - if total_num_blast_results == 0: - log.error(f'No contigs aligned to reference sequences! Aborting analysis.') - error_code = 7 - exit(error_code) - - log.info('Contigs BLASTed against reference sequences.') - log.info(f'Found {total_num_blast_results} total matches.') - - blast_results = full_blast_results[full_blast_results['pident'] >= identity] - blast_results = blast_results[blast_results['length'] >= length] - - identity_and_length_filtered_blast_output_path = os.path.join(outdir, f'{out_name}_contigs_blast_identity_and_length_filtered.tsv') - blast_results.to_csv(identity_and_length_filtered_blast_output_path, sep='\t', index=False) - num_blast_results_after_identity_and_length_filter = len(blast_results) - log.info(f'Found {num_blast_results_after_identity_and_length_filter} matches with at least {identity}% identity and {length} bp alignment length.') - - percent_blast_results_retained = num_blast_results_after_identity_and_length_filter / total_num_blast_results * 100 - log.info(f'Retained {percent_blast_results_retained:.2f}% matches for further analysis.') - - filtered_blast_results = filter_contig_blast_results(blast_results, outdir, out_name, identity, length) - - timestamp_analysis_complete = datetime.datetime.now().isoformat() - - if len(filtered_blast_results) == 0: - log.error(f'No contigs aligned to reference sequences! Aborting analysis.') - error_code = 7 - analysis_summary['return_code'] = error_code - analysis_summary['error_message'] = error_messages_by_code[error_code] - return analysis_summary - - num_filtered_blast_results = len(filtered_blast_results) - log.info(f'Remaining contig alignments after filtering: {num_filtered_blast_results}.') - - filtered_blast_output_path = os.path.join(outdir, f'{out_name}_contigs_blast_filtered.tsv') - filtered_blast_results.to_csv(filtered_blast_output_path, sep='\t', index=False) - - outputs = { - 'all_contig_blast_results': os.path.abspath(blast_output_path), - 'identity_and_length_filtered_contig_blast_results': os.path.abspath(identity_and_length_filtered_blast_output_path), - 'filtered_contig_blast_results': os.path.abspath(filtered_blast_output_path), - } - - analysis_summary = { - 'process_name': process_name, - 'timestamp_analysis_start': timestamp_analysis_start, - 'timestamp_analysis_complete': timestamp_analysis_complete, - 'return_code': return_code, - 'inputs': inputs, - 'outputs': outputs, - } - with open(os.path.join(outdir, 'analysis_summary.json'), 'w') as f: - json.dump(analysis_summary, f, indent=4) - f.write('\n') - - return analysis_summary - - -def filter_scaffold_blast_results(blast_results): - """ - A single reference sequence is chosen for each segment scaffold. - - First, the bitscores of all alignments between a scaffold and a reference sequence - are summed. The bitscore sum is calculated for each scaffold-reference - sequence pairing. The reference sequence giving the highest bitscore sum is - selected for each scaffold, with ties being broken by using the first - alphabetically. Once the best-matching reference sequence has been selected for - each segment scaffold, all other alignments are discarded. - - :param blast_results: BLASTn results. - :type blast_results: pd.DataFrame - :return: Filtered BLASTn results. - :rtype: pd.DataFrame - """ - log.info('Filtering scaffold BLAST alignments...') - - # Remove reversed alignments (they should be artefactual at this point). - # check number of reversed alignments - num_reversed_alignments = len(blast_results[blast_results['send'] < blast_results['sstart']]) - log.info(f'Found {num_reversed_alignments} reversed alignments.') - log.info('Removing reversed alignments...') - blast_results = blast_results[blast_results['send'] > blast_results['sstart']] - num_remaining_alignments = len(blast_results) - log.info(f'Remaining scaffold alignments after removing reversed alignments: {num_remaining_alignments}.') - - #Annotate scaffold seqs with segment. - query_annots = blast_results[['qseqid']].drop_duplicates() - get_segment = lambda row: row['qseqid'].split('|')[1].split('_')[0] - query_annots['segment'] = query_annots.apply(get_segment, axis=1) - blast_results = pd.merge(blast_results, query_annots, on='qseqid') - - # Find best-matching reference sequence for each segment. - # Best match is defined as the reference sequence with the highest bitscore sum. - # Ties are broken by selecting the first alphabetically. - cols = ['segment', 'sseqid', 'bitscore'] - group_cols = ['segment', 'sseqid'] - combo_scores = blast_results[cols].groupby(group_cols).sum().reset_index() - cols = ['segment', 'bitscore'] - group_cols = ['segment'] - max_scores = combo_scores[cols].groupby(group_cols).max().reset_index() - merge_cols = ['segment', 'bitscore'] - max_scores = pd.merge(max_scores, combo_scores, on=merge_cols) - cols = ['segment', 'sseqid'] - group_cols = ['segment'] - first_alpha = max_scores[cols].groupby(group_cols).min().reset_index() - merge_cols = ['segment', 'sseqid'] - blast_results = pd.merge(blast_results, first_alpha, on=merge_cols) - for segment in blast_results['segment'].unique(): - segment_results = blast_results[blast_results['segment']==segment] - ref_seq = segment_results['sseqid'].values[0] - log.info(f'Selected reference sequence for segment {segment}: {ref_seq}') - - return blast_results - - -def make_scaffold_seqs(inputs, outdir, out_name): - """ - A scaffold sequence is created for each genome segment by joining and - collapsing all the contigs describing that segment. - - Unaligned leading and trailing sequences are trimmed from the - contigs. - - Next, leading and trailing Ns are added to the contig so that it is - properly positioned within the segment (based on the subject-start and - subject-end coordinates of its alignment to the selected reference sequence). - - Next, clustalW is used to generate a multiple sequence alignment of the - trimmed, positioned contigs. This multiple sequence alignment is used to - generate a consensus sequence of the regions of the segment covered by - contigs. - - :param inputs: Dictionary of input files, with keys 'filtered_contig_blast_results'. - :type inputs: dict - :param outdir: Path to the output directory. - :type outdir: Path - :param out_name: Name used for outputs. - :type out_name: str - :return: Summary of the analysis. - :rtype: dict - """ - log.info('Creating scaffolds...') - log_dir = os.path.join(outdir, 'logs') - os.makedirs(log_dir, exist_ok=True) - - timestamp_analysis_start = datetime.datetime.now().isoformat() - analysis_summary = { - 'timestamp_analysis_start': timestamp_analysis_start, - 'inputs': inputs, - } - - outputs = {} - - filtered_contig_blast_results_path = inputs.get('filtered_contig_blast_results', None) - - # The 'segment' field of the filtered blast results may include 'NA' values, which - # represent the Neuraminidase (NA) segment. - # Disable the na_filter option to prevent pandas from converting 'NA' to NaN. - blast_results = pd.read_csv(filtered_contig_blast_results_path, sep='\t', na_filter=False) - - # Make sure contigs are all in the forward orientation. - rev_comp_bases = {'A': 'T', - 'T': 'A', - 'G': 'C', - 'C': 'G', - 'N': 'N', - '-': '-', - 'W': 'W', - 'S': 'S', - 'M': 'K', - 'K': 'M', - 'R': 'Y', - 'Y': 'R', - 'B': 'V', - 'D': 'H', - 'H': 'D', - 'V': 'B'} - - rev_comp_seq = lambda seq: ''.join(rev_comp_bases[base] - for base in seq[::-1]) - get_start = lambda row: min([row['sstart'], row['send']]) - - blast_results['start'] = blast_results.apply(get_start, axis=1) - get_end = lambda row: max([row['sstart'], row['send']]) - blast_results['end'] = blast_results.apply(get_end, axis=1) - - def flip_qseq(row): - """ - Flip the query sequence if the alignment is in the reverse orientation. - - :param row: BLASTn result. - :type row: pd.Series - :return: Flipped query sequence. - :rtype: str - """ - if row['sstart'] > row['send']: - log.info(f'Flipping seq for contig: {row["qseqid"]}') - return rev_comp_seq(row['qseq']) - else: - return row['qseq'] - - blast_results['qseq'] = blast_results.apply(flip_qseq, axis=1) - - def flip_sseq(row): - """ - Flip the subject sequence if the alignment is in the reverse orientation. - - :param row: BLASTn result. - :type row: pd.Series - :return: Flipped subject sequence. - :rtype: str - """ - if row['sstart'] > row['send']: - log.info(f'Flipping seq for ref: {row["sseqid"]}') - return rev_comp_seq(row['sseq']) - else: - return row['sseq'] - - blast_results['sseq'] = blast_results.apply(flip_sseq, axis=1) - blast_results_flipped_path = os.path.join(outdir, f'{out_name}_contigs_blast_filtered_flipped.tsv') - - blast_results.to_csv(blast_results_flipped_path, sep='\t', index=False) - log.info(f'Wrote flipped BLASTn results to {blast_results_flipped_path}') - outputs['flipped_contig_blast_results'] = os.path.abspath(blast_results_flipped_path) - - clustalw_return_code = None - # Trim contigs based on their alignments to reference sequences. Also - # add leading and trailing Ns to contig so that it is properly positioned - # within the genome segment. - segments = blast_results['segment'].unique() - contig_counter = {segment: 0 for segment in segments} - scaffold_seqs = {} - for segment in segments: - contigs = os.path.join(outdir, f'{segment}_contigs.fa') - with open(contigs, 'w') as f: - contig_results = blast_results[blast_results['segment']==segment] - for index, row in contig_results.iterrows(): - header = f'>{segment}_contig_{contig_counter[segment]}\n' - f.write(header) - seq = 'N' * (row['start'] - 1) - seq += row['qseq'].replace('-', '') - seq += ('N' * (row['slen'] - row['end'])) - f.write(seq + '\n') - contig_counter[segment] += 1 - log.info(f'Wrote {contig_counter[segment]} contigs for segment {segment} to {contigs}') - outputs[f'{segment}_contigs'] = os.path.abspath(contigs) - # Generate multiple sequence alignments of trimmed/positioned contigs. - log.info(f'Aligning contigs for segment {segment} with clustalw...') - aligned_contigs = os.path.join(outdir, f'{segment}_contigs.afa') - - if contig_counter[segment] > 1: - log.info(f'Generating multiple sequence alignment for segment {segment}...') - terminal_command = (f'clustalw -INFILE={contigs} ' - f'-OUTFILE={aligned_contigs} -OUTPUT=FASTA') - process_name = f'clustalw_{segment}' - error_code = 9 - clustalw_return_code = run(terminal_command, outdir, out_name, process_name, error_code) - analysis_summary['return_code'] = clustalw_return_code - else: - log.info(f'Only one contig for segment {segment}, skipping alignment.') - shutil.copyfile(contigs, aligned_contigs) - analysis_summary['return_code'] = 0 - - outputs[f'{segment}_aligned_contigs'] = os.path.abspath(aligned_contigs) - - # Replace leading and trailing Ns with dots so that they are ignored - # when determining consensus bases. - seqs = {} - with open(aligned_contigs, 'r') as input_file: - for line in input_file: - if line[0] == '>': - header = line.strip() - seqs[header] = '' - else: - seqs[header] += line.strip() - clean_seqs = [] - for seq in seqs.values(): - head_len = len(seq) - len(seq.lstrip('N-')) - tail_len = len(seq) - len(seq.rstrip('N-')) - seq = seq.strip('N-') - seq = ('.' * head_len) + seq - seq += ('.' * tail_len) - clean_seqs.append(seq) - - # Check that all seqs in the multiple seq alignment are the - # same length. ''' - alignment_lengths = set(len(seq) for seq in clean_seqs) - if len(alignment_lengths) > 1: - log.error(f'Multiple sequence alignment for {segment} ' - f'generated unequal alignment lengths! Aborting analysis.\n') - error_code = 10 - analysis_summary['return_code'] = error_code - analysis_summary['error_message'] = error_messages_by_code[error_code] - analysis_summary['outputs'] = outputs - return analysis_summary - elif len(alignment_lengths) == 0: - log.error(f'No sequences in the multiple sequence alignment for {segment}! Aborting analysis.\n') - error_code = 11 - analysis_summary['return_code'] = error_code - analysis_summary['error_message'] = error_messages_by_code[error_code] - analysis_summary['outputs'] = outputs - return analysis_summary - - # Create consensus sequence of multiple seq alignments, i.e. the - # scaffolds. - alignment_length = list(alignment_lengths)[0] - scaffold_seq = '' - for i in range(alignment_length): - bases = Counter(seq[i] for seq in clean_seqs if seq[i] not in '.') - if bases.most_common(1) == []: - scaffold_seq += 'N' - elif bases.most_common(1)[0][1] / len(bases) > 0.5: - scaffold_seq += bases.most_common(1)[0][0] - else: - scaffold_seq += 'N' - scaffold_seqs[segment] = scaffold_seq - - # Write out scaffolds. - segments = 'PB2 PB1 PA HA NP NA M NS'.split(' ') - segments = [segment for segment in segments if segment in scaffold_seqs] - scaffold_seqs = {segment: scaffold_seqs[segment] for segment in segments} - scaffolds_path = os.path.join(outdir, f'{out_name}_scaffolds.fa') - with open(scaffolds_path, 'w') as f: - for segment, seq in scaffold_seqs.items(): - header = f'>{out_name}|{segment}_scaffold' - f.write(header + '\n') - f.write(seq + '\n') - - outputs['scaffolds'] = os.path.abspath(scaffolds_path) - - log.info(f'Wrote {len(scaffold_seqs)} scaffolds to {scaffolds_path}') - - timestamp_analysis_complete = datetime.datetime.now().isoformat() - - analysis_summary['process_name'] = 'make_scaffold_seqs' - analysis_summary['timestamp_analysis_complete'] = timestamp_analysis_complete - analysis_summary['outputs'] = outputs - - with open(os.path.join(outdir, 'analysis_summary.json'), 'w') as f: - json.dump(analysis_summary, f, indent=4) - f.write('\n') - - return analysis_summary - - -def blast_scaffolds(inputs, outdir, out_name, threads): - """ - Scaffold sequences are aligned to reference sequences using BLASTn. - - :param inputs: Dictionary of input files, with keys 'scaffolds' and 'database'. - :type inputs: dict - :param outdir: Path to the output directory. - :type outdir: Path - :param out_name: Name used for outputs. - :type out_name: str - :param threads: Number of threads to use for BLASTn. - :type threads: int - :return: BLASTn results. - """ - log.info('BLASTing scaffolds against reference sequences...') - log_dir = os.path.join(outdir, 'logs') - os.makedirs(log_dir, exist_ok=True) - - timestamp_analysis_start = datetime.datetime.now().isoformat() - - analysis_summary = { - 'timestamp_analysis_start': timestamp_analysis_start, - 'inputs': inputs, - } - db = inputs.get('database', None) - - missing_db_files = [] - db = os.path.abspath(db) - for suffix in ['nhr', 'nin', 'nsq']: - db_file = db + '.' + suffix - if not os.path.exists(db_file): - missing_db_files.append(db_file) - - if missing_db_files: - terminal_command = (f'makeblastdb -in {db} -dbtype nucl') - process_name = 'makeblastdb_scaffolds' - error_code = 11 - run(terminal_command, outdir, out_name, process_name, error_code) - else: - log.info(f'BLAST database already exists for {db} Skipping BLAST db creation.') - - scaffolds_path = inputs.get('scaffolds', None) - blast_output = os.path.join(outdir, f'{out_name}_scaffolds_blast.tsv') - cols = [ - 'qseqid', - 'sseqid', - 'bitscore', - 'sstart', - 'send', - 'qseq', - 'sseq' - ] - cols_str = ' '.join(cols) - with open(blast_output, 'w') as f: - f.write('\t'.join(cols) + '\n') - - num_db_seqs = sum(line[0] == '>' for line in open(db, 'r').readlines()) - terminal_command = (f'blastn -query {scaffolds_path} -db {db} ' - f'-num_threads {threads} -max_target_seqs {num_db_seqs} ' - f'-outfmt "6 {cols_str}" >> {blast_output}') - process_name = 'blastn_scaffolds' - error_code = 12 - return_code = run(terminal_command, outdir, out_name, process_name, error_code) - if return_code != 0: - log.error(f'Error running BLASTn (Exit status: {return_code})') - analysis_summary['return_code'] = error_code - analysis_summary['error_message'] = error_messages_by_code[error_code] - return analysis_summary - - blast_results = pd.read_csv(blast_output, sep='\t') - num_blast_results = len(blast_results) - if num_blast_results == 0: - log.error(f'No scaffolds aligned to reference sequences! ' - f'Aborting analysis.\n') - error_code = 13 - analysis_summary['return_code'] = error_code - analysis_summary['error_message'] = error_messages_by_code[error_code] - return analysis_summary - - log.info('Scaffolds aligned to reference sequences.') - log.info(f'Found {num_blast_results} total scaffold BLAST alignments.') - - filtered_blast_results = filter_scaffold_blast_results(blast_results) - - num_filtered_blast_results = len(filtered_blast_results) - log.info(f'Remaining scaffold alignments after filtering: {num_filtered_blast_results}.') - filtered_blast_output = os.path.join(outdir, f'{out_name}_scaffolds_blast_filtered.tsv') - filtered_blast_results.to_csv(filtered_blast_output, sep='\t', index=False) - log.info(f'Wrote filtered scaffold alignments to {filtered_blast_output}') - - timestamp_analysis_complete = datetime.datetime.now().isoformat() - - outputs = { - 'all_scaffold_blast_results': os.path.abspath(blast_output), - 'filtered_scaffold_blast_results': os.path.abspath(filtered_blast_output), - } - analysis_summary['process_name'] = process_name - analysis_summary['timestamp_analysis_complete'] = timestamp_analysis_complete - analysis_summary['return_code'] = return_code - analysis_summary['outputs'] = outputs - - with open(os.path.join(outdir, 'analysis_summary.json'), 'w') as f: - json.dump(analysis_summary, f, indent=4) - f.write('\n') - - return analysis_summary - - - -def make_mapping_refs(inputs, outdir, out_name): - """ - Mapping references are created for each genome segment. These consist of - the scaffold for that segment, with all Ns in the scaffold filled-in using - the corresponding positions from that scaffold's best-matching reference - sequence. - - :param inputs: Dictionary of input files, with keys 'filtered_scaffold_blast_results' and 'database'. - :type inputs: dict - :param outdir: Path to the output directory. - :type outdir: Path - :param out_name: Name used for outputs. - :type out_name: str - - """ - log.info('Creating mapping references...') - - filtered_scaffold_blast_results_path = inputs.get('filtered_scaffold_blast_results', None) - blast_results = pd.read_csv(filtered_scaffold_blast_results_path, sep='\t') - db = inputs.get('database', None) - db = os.path.abspath(db) - - # Create dict with best-matching ref seq for each segment. - sseqids = blast_results['sseqid'].unique() - best_ref_seqs = {seq_name: '' for seq_name in sseqids} - with open(db, 'r') as input_file: - for line in input_file: - if line[0] == '>': - header = line.strip()[1:] - elif header in best_ref_seqs: - best_ref_seqs[header] += line.strip() - - # Create mapping ref for each segment. - def make_map_ref(data_frame): - data_frame = data_frame.sort_values(by=['sstart', 'send'], - ascending=[True, False]) - ref_seq = best_ref_seqs[data_frame['sseqid'].min()] - last_position = 0 - seq = '' - for index, row in data_frame.iterrows(): - if row['sstart'] > last_position: - seq += ref_seq[last_position:row['sstart'] - 1] - if row['send'] > last_position: - qseq = row['qseq'].upper() - sseq = row['sseq'].upper() - if row['sstart'] <= last_position: - start = (last_position - row['sstart']) + 1 - qseq = qseq[start:] - sseq = sseq[start:] - for qbase, sbase in zip(qseq, sseq): - if qbase in 'ATGC': - seq += qbase - else: - seq += sbase - last_position = row['send'] - seq += ref_seq[last_position:].upper() - seq = seq.replace('-', '') - return seq - - cols = ['sseqid', 'sstart', 'send', 'qseq', 'sseq'] - group_cols = ['sseqid'] - blast_results = blast_results[cols] - blast_results = blast_results.groupby(group_cols).apply(make_map_ref) - blast_results = blast_results.reset_index() - blast_results.columns = ['sseqid', 'mapping_seq'] - - # Annotate segment and subtype. - get_segment = lambda row: row['sseqid'].split('|')[2].split('_')[0] - blast_results['segment'] = blast_results.apply(get_segment, axis=1) - get_subtype = lambda row: row['sseqid'].split('|')[3].split('_')[0] - blast_results['subtype'] = blast_results.apply(get_subtype, axis=1) - - # Write mapping refs to FASTA. - segment_order = 'PB2 PB1 PA HA NP NA M NS'.split(' ') - get_segment_order = lambda row: segment_order.index(row['segment']) - blast_results['sort'] = blast_results.apply(get_segment_order, axis=1) - blast_results = blast_results.sort_values(by='sort') - - mapping_refs_path = os.path.join(outdir, f'{out_name}_mapping_refs.fa') - with open(mapping_refs_path, 'w') as f: - num_refs = 0 - for index, row in blast_results.iterrows(): - num_refs += 1 - accession, ref_name, segment, subtype = row['sseqid'].split('|')[:4] - accession = accession.lstrip('>') - ref_name = ref_name.replace('(', '|').replace(')', '') - header = f'>{out_name}|{segment}|{subtype}|{accession}|{ref_name}' - seq = row['mapping_seq'] - f.write(header + '\n') - f.write(seq + '\n') - - log.info(f'Wrote {num_refs} mapping references to {mapping_refs_path}') - - return mapping_refs_path - - -def map_reads(inputs, outdir, out_name, min_qual): - """ - Normalized, downsampled reads (from normalize_depth analysis stage) are mapped to the - mapping references (produced by make_mapping_refs func) using BWA mem. The alignment - is filtered to retain only paired reads, then sorted and indexed. - - :param inputs: Dictionary of input files, with keys 'normalized_reads_fwd', 'normalized_reads_rev' and 'mapping_refs'. - :type inputs: dict - :param outdir: Path to the output directory. - :type outdir: Path - :param out_name: Name used for outputs. - :type out_name: str - :param min_qual: Minimum mapping quality. - :type min_qual: int - """ - log.info('Mapping reads to references...') - log_dir = os.path.join(outdir, 'logs') - os.makedirs(log_dir, exist_ok=True) - timestamp_analysis_start = datetime.datetime.now().isoformat() - - analysis_summary = { - 'timestamp_analysis_start': timestamp_analysis_start, - 'inputs': inputs, - } - - mapping_refs_path = make_mapping_refs(inputs, outdir, out_name) - - terminal_command = (f'bwa index {mapping_refs_path}') - process_name = 'bwa_index' - error_code = 14 - return_code = run(terminal_command, outdir, out_name, process_name, error_code) - if return_code != 0: - log.error(f'Error running BWA index (Exit status: {return_code})') - analysis_summary['return_code'] = error_code - analysis_summary['error_message'] = error_messages_by_code[error_code] - return analysis_summary - - fwd_reads = inputs.get('normalized_reads_fwd', None) - rev_reads = inputs.get('normalized_reads_rev', None) - alignment_path = os.path.join(outdir, f'{out_name}_alignment.sam') - terminal_command = (f'bwa mem {mapping_refs_path} {fwd_reads} {rev_reads} ' - f'> {alignment_path}') - process_name = 'bwa_mem' - error_code = 15 - return_code = run(terminal_command, outdir, out_name, process_name, error_code) - if return_code != 0: - log.error(f'Error running BWA mem (Exit status: {return_code})') - analysis_summary['return_code'] = error_code - analysis_summary['error_message'] = error_messages_by_code[error_code] - return analysis_summary - - filtered_alignment_path = os.path.join(outdir, f'{out_name}_alignment.bam') - samtools_filter_flags = '2828' - log.info(f'Filtering alignment with sam flags: {samtools_filter_flags}.') - log.info(f'See: https://broadinstitute.github.io/picard/explain-flags.html for info on sam flags.') - log.info('Removing unmapped reads, secondary alignments, and supplementary alignments.') - log.info(f'Minimum mapping quality: {min_qual}') - terminal_command = (f'samtools view -f 1 -F {samtools_filter_flags} -q {min_qual} ' - f'-h {alignment_path} | samtools sort -o {filtered_alignment_path}') - process_name = 'samtools_view' - error_code = 16 - return_code = run(terminal_command, outdir, out_name, process_name, error_code) - if return_code != 0: - log.error(f'Error running samtools view (Exit status: {return_code})') - analysis_summary['return_code'] = error_code - analysis_summary['error_message'] = error_messages_by_code[error_code] - return analysis_summary - - log.info(f'Indexing alignment...') - terminal_command = (f'samtools index {filtered_alignment_path}') - process_name = 'samtools_index' - error_code = 17 - return_code = run(terminal_command, outdir, out_name, process_name, error_code) - if return_code != 0: - log.error(f'Error running samtools index (Exit status: {return_code})') - analysis_summary['return_code'] = error_code - analysis_summary['error_message'] = error_messages_by_code[error_code] - return analysis_summary - - log.info(f'Wrote alignment to {filtered_alignment_path}') - - outputs = { - 'mapping_refs': os.path.abspath(mapping_refs_path), - 'alignment': os.path.abspath(filtered_alignment_path), - 'alignment_index': os.path.abspath(filtered_alignment_path + '.bai'), - } - timestamp_analysis_complete = datetime.datetime.now().isoformat() - analysis_summary['process_name'] = 'map_reads' - analysis_summary['outputs'] = outputs - analysis_summary['return_code'] = return_code - analysis_summary['timestamp_analysis_complete'] = timestamp_analysis_complete - - with open(os.path.join(outdir, 'analysis_summary.json'), 'w') as f: - json.dump(analysis_summary, f, indent=4) - f.write('\n') - - return analysis_summary - - -def mask_ambig_low_cov( - variants_raw_path: Path, - outdir: Path, - out_name: str, - params: dict, -): - """ - The FreeBayes VCF output is parsed, analyzing total read depth and - variant read depth at each position. - - This allows positions to be masked for low coverage based on the read - depth considered by FreeBayes (which could be lower than the read depth - in the BAM depending on how FreeBayes applies it mapping quality and - base quality filters). - - This also allows positions to be masked as ambiguous when the number of - reads differing from the reference exceeds a threshold, but is not sufficient - enough to confidently call as a specific variant. - - :param variants_raw_path: Path to the raw FreeBayes VCF output. - :type variants_raw_path: str - :param outdir: Path to the output directory. - :type outdir: str - :param out_name: Name used for outputs. - :type out_name: str - :param params: Dictionary of parameters, with keys 'min_depth', 'min_qual', 'vaf_call', 'vaf_ambig'. - :type params: dict - :return: Output files. Keys are 'filtered_variants' and 'depth_of_cov'. - :rtype: dict - """ - log.info('Masking ambiguous and low coverage positions...') - outputs = {} - # Open input/output files and initialize dicts. - variants_raw = open(variants_raw_path, 'r') - variants_filtered_path = os.path.join(outdir, f'{out_name}_variants.vcf') - variants_filtered = open(variants_filtered_path, 'w') - depth_of_cov_path = os.path.join(outdir, f'{out_name}_depth_of_cov_freebayes.tsv') - depth_of_cov = open(depth_of_cov_path, 'w') - segments = 'PB2 PB1 PA HA NP NA M NS'.split(' ') - low_cov_pos = {segment: set() for segment in segments} - ambig_pos = {segment: set() for segment in segments} - variant_pos = {segment: set() for segment in segments} - segment_name, segment_length = dict(), {None: 0} - segment_length[None] = 0 - - # Parse header - line = variants_raw.readline() - while line != '' and line[0] == '#': - variants_filtered.write(line) - if line[:10] == '##contig=<': - name = line.strip().split('')[0]) - segment_name[segment] = name - segment_length[segment] = length - line = variants_raw.readline() - - # Parse body - last_segment = None - last_position = 0 - min_depth = params['min_depth'] - min_qual = params['min_mapping_quality'] - vaf_call = params['variant_threshold_calling'] - vaf_ambig = params['variant_threshold_masking'] - while line != '': - fields = line.strip().split('\t') - fields = (fields[0], fields[1], fields[3], fields[4], fields[5], - fields[8], fields[9]) - name, position, ref, alt, qual, keys, values = fields - segment = name.split('|')[1] - if segment != last_segment: - if last_position < segment_length[last_segment]: - for p in range(last_position + 1, - segment_length[last_segment] + 1): - low_cov_pos[last_segment].add(p) - depth_of_cov_line = [name, str(p), '0'] - depth_of_cov_line = '\t'.join(depth_of_cov_line) - depth_of_cov.write(depth_of_cov_line + '\n') - last_position = 0 - last_segment = segment - position = int(position) - if position != last_position + 1: - for p in range(last_position + 1, position): - low_cov_pos[segment].add(p) - depth_of_cov_line = [name, str(p), '0'] - depth_of_cov_line = '\t'.join(depth_of_cov_line) - depth_of_cov.write(depth_of_cov_line + '\n') - qual = float(qual) - info = {k: v for k, v in zip(keys.split(':'), values.split(':'))} - if 'DP' in info and info['DP'].isnumeric(): - total_depth = int(info['DP']) - else: - total_depth = 0 - if 'AO' in info: - alt_depths = tuple(int(i) if i.isnumeric() else 0 - for i in info['AO'].split(',')) - else: - alt_depths = (0, ) - max_alt_depth = max(alt_depths) - total_alt_depth = sum(alt_depths) - max_vaf = max_alt_depth / total_depth if total_depth > 0 else 0 - total_vaf = total_alt_depth / total_depth if total_depth > 0 else 0 - if all([qual >= min_qual, max_vaf >= vaf_call, - total_depth >= min_depth]): - variants_filtered.write(line) - variant_pos[segment].add(position) - position -= 1 - for p in ref: - position += 1 - depth_of_cov_line = [name, str(position), str(total_depth)] - depth_of_cov_line = '\t'.join(depth_of_cov_line) - depth_of_cov.write(depth_of_cov_line + '\n') - if total_depth < min_depth: - low_cov_pos[segment].add(position) - elif total_vaf >= vaf_ambig and max_vaf < vaf_call: - ambig_pos[segment].add(position) - last_position = position - line = variants_raw.readline() - if last_position < segment_length[last_segment]: - for p in range(last_position + 1, segment_length[last_segment] + 1): - low_cov_pos[last_segment].add(p) - depth_of_cov_line = [name, str(p), '0'] - depth_of_cov_line = '\t'.join(depth_of_cov_line) - depth_of_cov.write(depth_of_cov_line + '\n') - - # Close input/output files - variants_raw.close() - variants_filtered.close() - depth_of_cov.close() - - outputs['filtered_variants'] = os.path.abspath(variants_filtered_path) - log.info(f'Wrote filtered variants to {variants_filtered_path}') - outputs['depth_of_cov'] = os.path.abspath(depth_of_cov_path) - log.info(f'Wrote depth of coverage to {depth_of_cov_path}') - - # Convert sets of low cov positions into tuples representing zero-indexed - # spans of masked positions (start, end). - masked_pos = dict() - for segment in 'PB2 PB1 PA HA NP NA M NS'.split(' '): - masked_pos[segment] = low_cov_pos[segment].union(ambig_pos[segment]) - masked_pos[segment] = sorted(masked_pos[segment]) - spans = {segment: set() for segment in segments} - segments = [segment for segment in segments - if masked_pos[segment] != list()] - for segment in segments: - span_start = masked_pos[segment][0] - for pos_A, pos_B in zip(masked_pos[segment][:-1], - masked_pos[segment][1:]): - if pos_B != pos_A + 1: - span_end = pos_A - spans[segment].add((span_start - 1, span_end - 1)) - span_start = pos_B - span_end = masked_pos[segment][-1] - spans[segment].add((span_start - 1, span_end - 1)) - spans = {segment: sorted(spans[segment]) for segment in segments} - - # Write spans of low cov positions to TSV file for depth of coverage - # plots. - low_cov_path = os.path.join(outdir, f'{out_name}_low_cov.tsv') - with open(low_cov_path, 'w') as f: - for segment, segment_spans in spans.items(): - for (start, end) in segment_spans: - line = [segment_name[segment], start, end] - line = '\t'.join(str(i) for i in line) - f.write(line + '\n') - log.info(f'Wrote low coverage positions to {low_cov_path}') - outputs['low_cov'] = os.path.abspath(low_cov_path) - - # Write ambiguous positions to TSV file. - ambig_path = os.path.join(outdir, f'{out_name}_ambig.tsv') - with open(ambig_path, 'w') as f: - for segment in 'PB2 PB1 PA HA NP NA M NS'.split(' '): - for position in ambig_pos[segment]: - line = [segment_name[segment], position] - line = '\t'.join(str(i) for i in line) - f.write(line + '\n') - log.info(f'Wrote ambiguous positions to {ambig_path}') - outputs['ambiguous_positions'] = os.path.abspath(ambig_path) - - # Write variant positions to TSV file. - variants_tsv_path = os.path.join(outdir, f'{out_name}_variants.tsv') - with open(variants_tsv_path, 'w') as f: - for segment in 'PB2 PB1 PA HA NP NA M NS'.split(' '): - for position in variant_pos[segment]: - line = [segment_name[segment], position] - line = '\t'.join(str(i) for i in line) - f.write(line + '\n') - log.info(f'Wrote variant positions to {variants_tsv_path}') - outputs['variant_positions'] = os.path.abspath(variants_tsv_path) - - # Write spans of masked positions to BED file in BedGraph format. - masked_path = os.path.join(outdir, f'{out_name}_masked.bed') - with open(masked_path, 'w') as f: - for segment, segment_spans in spans.items(): - for (start, end) in segment_spans: - line = [segment_name[segment], start, end + 1, 0] - line = '\t'.join(str(i) for i in line) - f.write(line + '\n') - log.info(f'Wrote masked positions to {masked_path}') - outputs['masked_positions'] = os.path.abspath(masked_path) - - return outputs - - -def call_variants(inputs, outdir, out_name, params): - """ - FreeBayes is used to create a pileup and call variants from the - BAM file output (map_reads func). - - :param inputs: Dictionary of input files, with keys 'mapping_refs' and 'alignment'. - :type inputs: dict - :param outdir: Path to the output directory. - :type outdir: Path - :param out_name: Name used for outputs. - :type out_name: str - :param min_qual: Minimum base quality. - :type min_qual: int - :param max_depth: Maximum read depth. - :type max_depth: int - :return: Summary of the analysis. - :rtype: dict - """ - log.info('Calling variants...') - log_dir = os.path.join(outdir, 'logs') - os.makedirs(log_dir, exist_ok=True) - - timestamp_analysis_start = datetime.datetime.now().isoformat() - analysis_summary = { - 'timestamp_analysis_start': timestamp_analysis_start, - 'inputs': inputs, - } - - mapping_refs = inputs.get('mapping_refs', None) - filtered_alignment = inputs.get('alignment', None) - - variants_raw_path = os.path.join(outdir, f'{out_name}_variants_raw.vcf') - min_qual = params.get('min_mapping_quality', 20) - max_depth = params.get('coverage_limit', 1000) - - log.info(f'Minimum mapping quality: {min_qual}') - log.info(f'Minimum base quality: {min_qual}') - log.info(f'Maximum read depth: {max_depth}') - terminal_command = (f'freebayes -f {mapping_refs} {filtered_alignment} -p 1 ' - f'--limit-coverage {max_depth} ' - f'--min-mapping-quality {min_qual} ' - f'--min-base-quality {min_qual} --pooled-continuous ' - f'--report-monomorphic --haplotype-length 0 ' - f'--min-alternate-count 1 --min-alternate-fraction 0 ' - f'> {variants_raw_path}') - process_name = 'freebayes' - error_code = 18 - return_code = run(terminal_command, outdir, out_name, process_name, error_code) - if return_code != 0: - log.error(f'Error running FreeBayes (Exit status: {return_code})') - analysis_summary['return_code'] = error_code - analysis_summary['error_message'] = error_messages_by_code[error_code] - return analysis_summary - - log.info(f'Wrote raw variants to {variants_raw_path}') - - masking_outputs = mask_ambig_low_cov( - variants_raw_path, - outdir, - out_name, - params, - ) - - outputs = { - 'variants_raw': os.path.abspath(variants_raw_path), - 'variants_filtered': os.path.abspath(masking_outputs['filtered_variants']), - 'masked_positions': os.path.abspath(masking_outputs['masked_positions']), - 'depth_of_cov_freebayes': os.path.abspath(masking_outputs['depth_of_cov']), - 'low_coverage_positions': os.path.abspath(masking_outputs['low_cov']), - 'ambiguous_positions': os.path.abspath(masking_outputs['ambiguous_positions']), - 'variant_positions': os.path.abspath(masking_outputs['variant_positions']), - } - - analysis_summary['process_name'] = 'call_variants' - timestamp_analysis_complete = datetime.datetime.now().isoformat() - analysis_summary['timestamp_analysis_complete'] = timestamp_analysis_complete - analysis_summary['return_code'] = return_code - analysis_summary['outputs'] = outputs - - with open(os.path.join(outdir, 'analysis_summary.json'), 'w') as f: - json.dump(analysis_summary, f, indent=4) - f.write('\n') - - return analysis_summary - - -def make_consensus_seqs(inputs, outdir, out_name): - """ - High quality variants and masked positions (mask_ambig_low_cov func) are - applied to the mapping references (make_mapping_refs) to generate the final - consensus sequences for each segment. - - :param inputs: Dictionary of input files, with keys 'variants_filtered' and 'mapping_refs'. - :type inputs: dict - :param outdir: Path to the output directory. - :type outdir: Path - :param out_name: Name used for outputs. - :type out_name: str - """ - log.info('Generating consensus sequences...') - log_dir = os.path.join(outdir, 'logs') - os.makedirs(log_dir, exist_ok=True) - - timestamp_analysis_start = datetime.datetime.now().isoformat() - analysis_summary = { - 'timestamp_analysis_start': timestamp_analysis_start, - 'inputs': inputs, - } - outputs = {} - - # Zip and index VCF. - variants = inputs.get('variants_filtered', None) - zipped_variants = os.path.join(outdir, f'{out_name}_variants.bcf') - terminal_command = (f'bcftools view {variants} -Ob -o {zipped_variants}') - process_name = 'bcftools_view' - error_code = 19 - return_code = run(terminal_command, outdir, out_name, process_name, error_code) - if return_code != 0: - log.error(f'Error running bcftools view (Exit status: {return_code})') - analysis_summary['return_code'] = error_code - analysis_summary['error_message'] = error_messages_by_code[error_code] - return analysis_summary - - terminal_command = (f'bcftools index {zipped_variants}') - process_name = 'bcftools_index' - error_code = 20 - return_code = run(terminal_command, outdir, out_name, process_name, error_code) - if return_code != 0: - log.error(f'Error running bcftools index (Exit status: {return_code})') - analysis_summary['return_code'] = error_code - analysis_summary['error_message'] = error_messages_by_code[error_code] - return analysis_summary - - # Apply variants to mapping refs. - log.info('Applying variants to mapping references...') - mapping_refs = inputs.get('mapping_refs', None) - masked = inputs.get('masked_positions', None) - consensus_seqs = os.path.join(outdir, f'{out_name}_consensus_seqs.fa') - terminal_command = (f'cat {mapping_refs} | bcftools consensus -m {masked} ' - f'{zipped_variants} > {consensus_seqs}') - process_name = 'bcftools_consensus' - error_code = 21 - return_code = run(terminal_command, outdir, out_name, process_name, error_code) - if return_code != 0: - log.error(f'Error running bcftools consensus (Exit status: {return_code})') - analysis_summary['return_code'] = error_code - analysis_summary['error_message'] = error_messages_by_code[error_code] - return analysis_summary - - # Reformat FASTA headers and remove whitespace. - clean_seqs = {} - with open(consensus_seqs, 'r') as f: - for line in f: - if line[0] == '>': - header = line.strip() - clean_seqs[header] = '' - else: - clean_seqs[header] += line.strip().upper() - - with open(consensus_seqs, 'w') as f: - for header, seq in clean_seqs.items(): - header = '|'.join(header.split('|')[:3]) + '|' - f.write(header + '\n') - f.write(seq + '\n') - - log.info(f'Wrote consensus sequences to {consensus_seqs}') - outputs['consensus_seqs'] = os.path.abspath(consensus_seqs) - - # Check that consensus seq lenghts are within expected range. ''' - segment_lengths = {'PB2': (2260, 2360), 'PB1': (2260, 2360), 'PA': (2120, 2250), - 'HA': (1650, 1800), 'NP': (1480, 1580), 'NA': (1250, 1560), - 'M': (975, 1030), 'NS': (815, 900)} - for header, seq in clean_seqs.items(): - segment = header.split('|')[1] - min_length = segment_lengths[segment][0] - max_length = segment_lengths[segment][1] - if not (min_length <= len(seq) <= max_length): - log.error(f'The consensus sequence generated for segment ' - f'{segment} is not within the expected length range ' - f'({min_length} to {max_length} bases).\n') - exit(22) - - timestamp_analysis_complete = datetime.datetime.now().isoformat() - analysis_summary['process_name'] = 'make_consensus_seqs' - analysis_summary['timestamp_analysis_complete'] = timestamp_analysis_complete - analysis_summary['return_code'] = return_code - analysis_summary['outputs'] = outputs - - return analysis_summary diff --git a/fluviewer/cli_args.py b/fluviewer/cli_args.py deleted file mode 100644 index f009109..0000000 --- a/fluviewer/cli_args.py +++ /dev/null @@ -1,98 +0,0 @@ -import argparse -import os -from pathlib import Path - -def parse_args(): - """ - Parse command line arguments using argparse. - - :return: Dictionary of arguments and their values. - :rtype: dict - """ - parser = argparse.ArgumentParser(description='BCCDC-PHL/FluViewer: Influenza A virus consensus sequence generation and variant calling') - parser.add_argument('-f', '--forward-reads', type=Path, required=True, help='Path to FASTQ file containing forward reads') - parser.add_argument('-r', '--reverse-reads', type=Path, required=True, help='Path to FASTQ file containing reverse reads') - parser.add_argument('-d', '--db', type=Path, required=True, help='Path to FASTA file containing FluViewer database') - parser.add_argument('-o', '--outdir', type=Path, help='Output directory (default=FluViewer_)') - parser.add_argument('-n', '--output-name', type=str, required=True, help='Output name. Includes this name in output files, and in consensus sequence headers') - parser.add_argument('-i', '--min-identity', type=float, default=90, metavar="[0-100]", help='Minimum percent sequence identity between database reference sequences and contigs (default=90)') - parser.add_argument('-l', '--min-alignment-length', type=int, default=50, metavar="[32-]", help='Minimum length of alignment between database reference sequences and contigs (default=50)') - parser.add_argument('-D', '--min-depth', type=int, default=20, metavar="[1-]", help='Minimum read depth for base calling (default=20)') - parser.add_argument('-q', '--min-mapping-quality', type=int, default=20, metavar="[0-]", help='Minimum PHRED score for mapping quality and base quality during variant calling (default=20)') - parser.add_argument('-v', '--variant-threshold-calling', type=float, default=0.75, metavar="[0-1]", help='Variant allele fraction threshold for calling variants (default=0.75)') - parser.add_argument('-V', '--variant-threshold-masking', type=float, default=0.25, metavar="[0-1]", help='Variant allele fraction threshold for masking ambiguous variants (default=0.25)') - parser.add_argument('-N', '--target-depth', type=int, default=200, metavar="[1-]", help='Target depth for pre-normalization of reads (default=200)') - parser.add_argument('-L', '--coverage-limit', type=int, default=200, metavar="[1-]", help='Coverage depth limit for variant calling (default=200)') - parser.add_argument('-t', '--threads', type=int, default=1, metavar="[1-]", help='Threads used for contig/scaffold alignments (default=1)') - parser.add_argument('-M', '--max-memory', type=int, metavar="[1-]", help='Gigabytes of memory allocated for normalizing reads (default=max)') - parser.add_argument('-g', '--disable-garbage-collection', action='store_true', help='Disable garbage collection and retain intermediate analysis files') - parser.add_argument('--log-level', default='info', choices=['info', 'debug'], help='Log level (default=info)') - - args = parser.parse_args() - - return args - - - -def validate_args(args): - """ - Validate the arguments provided by the user. - - :param args: Dictionary of arguments and their values. - :type args: argparse.Namespace - """ - independent_validation_rules = { - 'forward_reads': { 'validation_fn': lambda x: os.path.isfile(x), - 'error_msg': 'Input file does not exist: {0}' }, - 'reverse_reads': { 'validation_fn': lambda x: os.path.isfile(x), - 'error_msg': 'Input file does not exist: {0}' }, - 'db': { 'validation_fn': lambda x: os.path.isfile(x), - 'error_msg': 'Input file does not exist: {0}' }, - 'outdir': { 'validation_fn': lambda x: True, - 'error_msg': None }, - 'min_identity': { 'validation_fn': lambda x: 0 <= x <= 100, - 'error_msg': 'Minimum percent sequence identity must be between 0 and 100: {0}' }, - 'min_alignment_length': { 'validation_fn': lambda x: x >= 32, - 'error_msg': 'Minimum alignment length must be at least 32: {0}' }, - 'min_depth': { 'validation_fn': lambda x: x >= 1, - 'error_msg': 'Minimum read depth must be at least 1: {0}' }, - 'min_mapping_quality': { 'validation_fn': lambda x: x >= 0, - 'error_msg': 'Minimum PHRED score must be at least 0: {0}' }, - 'variant_threshold_calling': { 'validation_fn': lambda x: 0 <= x <= 1, - 'error_msg': 'Variant allele fraction threshold for calling variants must be between 0 and 1: {0}' }, - 'variant_threshold_masking': { 'validation_fn': lambda x: 0 <= x <= 1, - 'error_msg': 'Variant allele fraction threshold for masking ambiguous variants must be between 0 and 1: {0}' }, - 'target_depth': { 'validation_fn': lambda x: x >= 1, - 'error_msg': 'Target depth for pre-normalization of reads must be at least 1: {0}' }, - 'coverage_limit': { 'validation_fn': lambda x: x >= 1, - 'error_msg': 'Coverage depth limit for variant calling must be at least 1: {0}' }, - 'threads': { 'validation_fn': lambda x: x >= 1, - 'error_msg': 'Threads used for contig/scaffold alignments must be at least 1: {0}' }, - 'max_memory': { 'validation_fn': lambda x: x is None or x >= 1, - 'error_msg': 'Gigabytes of memory allocated for normalizing reads must be at least 1: {0}' } - } - - for arg_name, rule in independent_validation_rules.items(): - validation_fn = rule['validation_fn'] - arg_value = getattr(args, arg_name) - valid = validation_fn(arg_value) - if not valid: - raise ValueError(rule['error_msg'].format(arg_value)) - - interdependent_validation_rules = { - ('variant_threshold_masking', 'variant_threshold_calling'): { - 'validation_fn': lambda x, y: x < y, - 'error_msg': 'Variant allele fraction for masking ambiguous variants must be lower than variant allele fraction for calling variants. variant_threshold_masking: {0}, variant_threshold_calling: {1}' - } - } - - for arg_pair, rule in interdependent_validation_rules.items(): - arg_1_value = getattr(args, arg_pair[0]) - arg_2_value = getattr(args, arg_pair[1]) - validation_fn = rule['validation_fn'] - valid = validation_fn(arg_1_value, arg_2_value) - if not valid: - raise ValueError(rule['error_msg'].format(arg_1_value, arg_2_value)) - - - return args diff --git a/fluviewer/database.py b/fluviewer/database.py deleted file mode 100644 index 8c3da06..0000000 --- a/fluviewer/database.py +++ /dev/null @@ -1,118 +0,0 @@ -import logging -import os - -from collections import Counter -from math import ceil, log10 - - -log = logging.getLogger(__name__) - - -def check_database(db, outdir, out_name): - """ - Checks the contents of the provided reference sequence database to - ensure proper header formatting and unambiguous sequences. - - :param db: Path to the reference sequence database. - :type db: Path - :param outdir: Path to the output directory. - :type outdir: str - :return: None - :rtype: None - """ - log.info('Checking reference sequence database...') - seqs = {} - headers = [] - with open(db, 'r') as f: - for line in f: - if line.startswith('>'): - header = line.strip() - headers.append(header) - seqs[header] = '' - else: - seqs[header] += line.strip() - headers = Counter(headers) - for header in headers: - if headers[header] > 1: - log.error(f'The following header is used for multiple sequences:' - f'\n{header}\n') - exit(1) - - num_seqs = len(seqs) - one_tenth_of_seqs = ceil(num_seqs / 10) - log.info(f'Found {num_seqs} sequences in database.') - - log.info('Checking database sequences...') - - segment_lengths = { - 'PB2': (2260, 2360), - 'PB1': (2260, 2360), - 'PA': (2120, 2250), - 'HA': (1650, 1800), - 'NP': (1480, 1580), - 'NA': (1250, 1560), - 'M': (975, 1030), - 'NS': (815, 900), - } - - log.info('Checking sequence headers') - log.info('Checking sequence lengths within expected ranges for each segment') - log.info('Checking for ambiguous or lower-case nucleotides') - - num_seqs_checked = 0 - for header, seq in seqs.items(): - - # - # Check header formatting - if len(header.split('|')) != 4: - log.error(f'The header for the following database entry does not contain the expected number of |-delimited fields:\n{header}\n') - exit(1) - else: - accession, name, segment, subtype = header.split('|') - - # Check that the strain name is formatted correctly - # (e.g. "A/California/04/2009(H1N1)") - if any([name.count('(') != 1, name.count(')') != 1, - name[-1] != ')', name.index('(') > name.index(')')]): - log.error(f'The strain_name(strain_subtype) for the following database entry is improperly formatted:\n{header}\n') - exit(1) - if segment not in segment_lengths.keys(): - log.error(f'The segment indicated for the following database entry is not recognized:\n{header}\n') - exit(1) - if len(seq) != sum(seq.count(base) for base in 'ATGC'): - log.error(f'The sequence provided for the following database entry ' - f'contains ambiguous or lower-case nucleotides:\n{header}\n') - exit(1) - - # - # Check sequence lengths - min_length = segment_lengths[segment][0] - max_length = segment_lengths[segment][1] - if not (min_length <= len(seq) <= max_length): - log.error(f'The sequence provided for the following database entry ' - f'is not within the expected length range for its indicated ' - f'segment ({segment}: {min_length} to {max_length} bases): ' - f'\n{header}\n') - exit(1) - - num_seqs_checked += 1 - if num_seqs_checked % one_tenth_of_seqs == 0: - percent_seqs_checked = num_seqs_checked / num_seqs * 100 - log.info(f'Checked {num_seqs_checked} ({percent_seqs_checked:.2f}%) database sequences...') - - percent_seqs_checked = num_seqs_checked / num_seqs * 100 - - log.info(f'Confirmed {num_seqs} ({percent_seqs_checked}%) database sequence headers are properly formatted.') - log.info(f'Confirmed {num_seqs} ({percent_seqs_checked}%) database sequences are within expected length ranges for their indicated segments.') - log.info(f'Confirmed {num_seqs} ({percent_seqs_checked}%) database sequences do not contain ambiguous or lower-case nucleotides.') - - db = os.path.abspath(db) - db_suffixes = ['nhr', 'nin', 'nsq'] - if any([not os.path.exists(db + '.' + suffix) for suffix in db_suffixes]): - terminal_command = f'makeblastdb -in {db} -dbtype nucl' - process_name = 'makeblastdb_contigs' - error_code = 5 - return_code = run(terminal_command, outdir, out_name, process_name, error_code) - if return_code != 0: - log.error('Error running makeblastdb.') - exit(1) diff --git a/fluviewer/fluviewer.py b/fluviewer/fluviewer.py deleted file mode 100644 index 6d40a57..0000000 --- a/fluviewer/fluviewer.py +++ /dev/null @@ -1,532 +0,0 @@ -import argparse -import json -import logging -import os -import shutil -import subprocess -import sys - -from collections import Counter -from math import ceil -from pathlib import Path - -import numpy as np -import pandas as pd - -from . import __version__ - -from . import cli_args -from . import logging_config -from . import database -from . import analysis -from . import report -from . import plots - - -log = logging.getLogger(__name__) - -def main(): - version = __version__ - - args = cli_args.parse_args() - try: - args = cli_args.validate_args(args) - except ValueError as e: - print(f'Error: {e}', sys.stderr) - exit(1) - - - if not os.path.exists(args.outdir): - os.makedirs(args.outdir) - else: - print(f'Output directory already exists: {args.outdir}', sys.stderr) - exit(1) - - log_level = getattr(logging, args.log_level.upper()) - print(log_level) - if not isinstance(log_level, int): - raise ValueError(f'Invalid log level: {args.log_level}') - - logs_dir = os.path.join(args.outdir, 'logs') - os.makedirs(logs_dir, exist_ok=True) - log_file = os.path.join(logs_dir, 'fluviewer.log') - logging_config.configure_logging(log_level, log_file) - - log.info(f'BCCDC-PHL/FluViewer v{version}') - version_split = version.split('-') - log.info(f'Derived from: KevinKuchinski/FluViewer v{version_split[0]}') - log.info(f'Inputs:') - log.info(f"Fwd reads: {args.forward_reads}") - log.info(f"Rev reads: {args.reverse_reads}") - log.info(f"Reference sequences: {args.db}") - - log.info(f"Outputs:") - log.info(f"Output directory: {args.outdir}") - log.info(f"Output name: {args.output_name}") - - log.info(f'Parameters:') - log.info(f"Minimum percent identity: {args.min_identity}") - log.info(f"Minimum alignment length: {args.min_alignment_length}") - log.info(f"Minimum read depth: {args.min_depth}") - log.info(f"Minimum mapping quality: {args.min_mapping_quality}") - log.info(f"Variant allele fraction threshold for calling variants: {args.variant_threshold_calling}") - log.info(f"Variant allele fraction threshold for masking ambiguous variants: {args.variant_threshold_masking}") - log.info(f"Target depth for pre-normalization of reads: {args.target_depth}") - log.info(f"Coverage depth limit for variant calling: {args.coverage_limit}") - - - database.check_database( - args.db, - args.outdir, - args.output_name, - ) - - analysis_stages = [ - 'normalize_depth', - 'assemble_contigs', - 'blast_contigs', - 'scaffolding', - 'read_mapping', - 'variant_calling', - 'consensus_calling', - 'summary_reporting', - ] - - - log.info('Starting analysis...') - - - # - # Stage 0: Normalize depth of reads. - # - current_analysis_stage = 'normalize_depth' - current_analysis_stage_index = analysis_stages.index(current_analysis_stage) - current_analysis_stage_outdir = os.path.join(args.outdir, 'analysis_by_stage', f'{current_analysis_stage_index:02}_{current_analysis_stage}') - current_analysis_stage_outdir = os.path.abspath(current_analysis_stage_outdir) - log.info(f'Beginning analysis stage: {current_analysis_stage}') - log.info(f'Output directory: {current_analysis_stage_outdir}') - - current_analysis_stage_inputs = { - 'input_reads_fwd': os.path.abspath(args.forward_reads), - 'input_reads_rev': os.path.abspath(args.reverse_reads), - } - - normalize_depth_analysis_summary = analysis.normalize_depth( - current_analysis_stage_inputs, - current_analysis_stage_outdir, - args.output_name, - os.path.abspath(args.forward_reads), - os.path.abspath(args.reverse_reads), - args.target_depth, - args.max_memory, - ) - if normalize_depth_analysis_summary['return_code'] != 0: - log.error(f'Error in analysis stage: {current_analysis_stage}') - log.error(f'Error code: {normalize_depth_analysis_summary["return_code"]}') - exit(normalize_depth_analysis_summary['return_code']) - - # - # Publish outputs and logs - outputs_to_publish = { - } - for output_name, output_dir in outputs_to_publish.items(): - src_path = normalize_depth_analysis_summary['outputs'][output_name] - dest_path = os.path.join(output_dir, os.path.basename(src_path)) - shutil.copy(src_path, dest_path) - log.info(f'Published output: {output_name} -> {dest_path}') - - analysis_stage_logs_src_dir = os.path.join(current_analysis_stage_outdir, 'logs') - analysis_stage_logs_dest_dir = os.path.join(logs_dir, os.path.basename(current_analysis_stage_outdir)) - os.makedirs(analysis_stage_logs_dest_dir) - for log_file in os.listdir(analysis_stage_logs_src_dir): - src_path = os.path.join(analysis_stage_logs_src_dir, log_file) - dest_path = os.path.join(analysis_stage_logs_dest_dir, log_file) - shutil.copy(src_path, dest_path) - log.info(f'Published log file: {log_file} -> {dest_path}') - - log.info(f'Analysis stage complete: {current_analysis_stage}') - - - # - # Stage 1: Assemble contigs. - # - current_analysis_stage = 'assemble_contigs' - current_analysis_stage_index = analysis_stages.index(current_analysis_stage) - - current_analysis_stage_inputs = { - 'normalized_reads_fwd': normalize_depth_analysis_summary['outputs']['normalized_reads_fwd'], - 'normalized_reads_rev': normalize_depth_analysis_summary['outputs']['normalized_reads_rev'], - } - current_analysis_stage_outdir = os.path.join(args.outdir, 'analysis_by_stage', f'{current_analysis_stage_index:02}_{current_analysis_stage}') - current_analysis_stage_outdir = os.path.abspath(current_analysis_stage_outdir) - - log.info(f'Beginning analysis stage: {current_analysis_stage}') - - assemble_contigs_analysis_summary = analysis.assemble_contigs( - current_analysis_stage_inputs, - current_analysis_stage_outdir, - args.output_name, - ) - if assemble_contigs_analysis_summary['return_code'] != 0: - log.error(f'Error in analysis stage: {current_analysis_stage}') - log.error(f'Error code: {assemble_contigs_analysis_summary["return_code"]}') - exit(assemble_contigs_analysis_summary['return_code']) - - # - # Publish outputs and logs - outputs_to_publish = { - } - for output_name, output_dir in outputs_to_publish.items(): - src_path = assemble_contigs_analysis_summary['outputs'][output_name] - dest_path = os.path.join(output_dir, os.path.basename(src_path)) - shutil.copy(src_path, dest_path) - log.info(f'Published output: {output_name} -> {dest_path}') - - analysis_stage_logs_src_dir = os.path.join(current_analysis_stage_outdir, 'logs') - analysis_stage_logs_dest_dir = os.path.join(logs_dir, os.path.basename(current_analysis_stage_outdir)) - os.makedirs(analysis_stage_logs_dest_dir) - for log_file in os.listdir(analysis_stage_logs_src_dir): - src_path = os.path.join(analysis_stage_logs_src_dir, log_file) - dest_path = os.path.join(analysis_stage_logs_dest_dir, log_file) - shutil.copy(src_path, dest_path) - log.info(f'Published log file: {log_file} -> {dest_path}') - - log.info(f'Analysis stage complete: {current_analysis_stage}') - - - # - # Stage 2: Blast contigs. - # - current_analysis_stage = 'blast_contigs' - current_analysis_stage_index = analysis_stages.index(current_analysis_stage) - current_analysis_stage_inputs = { - 'contigs': assemble_contigs_analysis_summary['outputs']['contigs'], - 'database': os.path.abspath(args.db), - } - current_analysis_stage_outdir = os.path.join(args.outdir, 'analysis_by_stage', f'{current_analysis_stage_index:02}_{current_analysis_stage}') - current_analysis_stage_outdir = os.path.abspath(current_analysis_stage_outdir) - log.info(f'Beginning analysis stage: {current_analysis_stage}') - - blast_contigs_analysis_summary = analysis.blast_contigs( - current_analysis_stage_inputs, - current_analysis_stage_outdir, - args.output_name, - args.threads, - args.min_identity, - args.min_alignment_length, - ) - if blast_contigs_analysis_summary['return_code'] != 0: - log.error(f'Error in analysis stage: {current_analysis_stage}') - log.error(f'Error code: {blast_contigs_analysis_summary["return_code"]}') - exit(blast_contigs_analysis_summary['return_code']) - - # - # Publish outputs and logs - outputs_to_publish = { - } - for output_name, output_dir in outputs_to_publish.items(): - src_path = blast_contigs_analysis_summary['outputs'][output_name] - dest_path = os.path.join(output_dir, os.path.basename(src_path)) - shutil.copy(src_path, dest_path) - log.info(f'Published output: {output_name} -> {dest_path}') - - analysis_stage_logs_src_dir = os.path.join(current_analysis_stage_outdir, 'logs') - analysis_stage_logs_dest_dir = os.path.join(logs_dir, os.path.basename(current_analysis_stage_outdir)) - os.makedirs(analysis_stage_logs_dest_dir) - for log_file in os.listdir(analysis_stage_logs_src_dir): - src_path = os.path.join(analysis_stage_logs_src_dir, log_file) - dest_path = os.path.join(analysis_stage_logs_dest_dir, log_file) - shutil.copy(src_path, dest_path) - log.info(f'Published log file: {log_file} -> {dest_path}') - - log.info(f'Analysis stage complete: {current_analysis_stage}') - - - # - # Stage 3: Scaffolding. - # - current_analysis_stage = 'scaffolding' - current_analysis_stage_index = analysis_stages.index(current_analysis_stage) - current_analysis_stage_inputs = { - 'filtered_contig_blast_results': blast_contigs_analysis_summary['outputs']['filtered_contig_blast_results'], - } - current_analysis_stage_outdir = os.path.join(args.outdir, 'analysis_by_stage', f'{current_analysis_stage_index:02}_{current_analysis_stage}') - current_analysis_stage_outdir = os.path.abspath(current_analysis_stage_outdir) - - log.info(f'Beginning analysis stage: {current_analysis_stage}') - - make_scaffold_seqs_analysis_summary = analysis.make_scaffold_seqs( - current_analysis_stage_inputs, - current_analysis_stage_outdir, - args.output_name, - ) - if make_scaffold_seqs_analysis_summary['return_code'] != 0: - log.error(f'Error in analysis stage: {current_analysis_stage}') - log.error(f'Error code: {make_scaffold_seqs_analysis_summary["return_code"]}') - exit(make_scaffold_seqs_analysis_summary['return_code']) - - blast_scaffolds_inputs = { - 'scaffolds': make_scaffold_seqs_analysis_summary['outputs']['scaffolds'], - 'database': os.path.abspath(args.db), - } - - blast_scaffolds_analysis_summary = analysis.blast_scaffolds( - blast_scaffolds_inputs, - current_analysis_stage_outdir, - args.output_name, - args.threads, - ) - - # - # Publish outputs and logs - outputs_to_publish = { - } - for output_name, output_dir in outputs_to_publish.items(): - if output_name in make_scaffold_seqs_analysis_summary['outputs']: - src_path = make_scaffold_seqs_analysis_summary['outputs'][output_name] - elif output_name in blast_scaffolds_analysis_summary['outputs']: - src_path = blast_scaffolds_analysis_summary['outputs'][output_name] - dest_path = os.path.join(output_dir, os.path.basename(src_path)) - shutil.copy(src_path, dest_path) - log.info(f'Published output: {output_name} -> {dest_path}') - - analysis_stage_logs_src_dir = os.path.join(current_analysis_stage_outdir, 'logs') - analysis_stage_logs_dest_dir = os.path.join(logs_dir, os.path.basename(current_analysis_stage_outdir)) - os.makedirs(analysis_stage_logs_dest_dir) - for log_file in os.listdir(analysis_stage_logs_src_dir): - src_path = os.path.join(analysis_stage_logs_src_dir, log_file) - dest_path = os.path.join(analysis_stage_logs_dest_dir, log_file) - shutil.copy(src_path, dest_path) - log.info(f'Published log file: {log_file} -> {dest_path}') - - log.info(f'Analysis stage complete: {current_analysis_stage}') - - - # - # Stage 4: Read mapping. - # - current_analysis_stage = 'read_mapping' - current_analysis_stage_index = analysis_stages.index(current_analysis_stage) - current_analysis_stage_outdir = os.path.join(args.outdir, 'analysis_by_stage', f'{current_analysis_stage_index:02}_{current_analysis_stage}') - current_analysis_stage_outdir = os.path.abspath(current_analysis_stage_outdir) - current_analysis_stage_inputs = { - 'filtered_scaffold_blast_results': blast_scaffolds_analysis_summary['outputs']['filtered_scaffold_blast_results'], - 'database': os.path.abspath(args.db), - 'normalized_reads_fwd': normalize_depth_analysis_summary['outputs']['normalized_reads_fwd'], - 'normalized_reads_rev': normalize_depth_analysis_summary['outputs']['normalized_reads_rev'], - } - log.info(f'Beginning analysis stage: {current_analysis_stage}') - - map_reads_analysis_summary = analysis.map_reads( - current_analysis_stage_inputs, - current_analysis_stage_outdir, - args.output_name, - args.min_mapping_quality, - ) - if map_reads_analysis_summary['return_code'] != 0: - log.error(f'Error in analysis stage: {current_analysis_stage}') - log.error(f'Error code: {map_reads_analysis_summary["return_code"]}') - exit(map_reads_analysis_summary['return_code']) - - # - # Publish outputs and logs - outputs_to_publish = { - 'mapping_refs': os.path.join(args.outdir), - 'alignment': os.path.join(args.outdir), - 'alignment_index': os.path.join(args.outdir), - } - for output_name, output_dir in outputs_to_publish.items(): - src_path = map_reads_analysis_summary['outputs'][output_name] - dest_path = os.path.join(output_dir, os.path.basename(src_path)) - shutil.copy(src_path, dest_path) - log.info(f'Published output: {output_name} -> {dest_path}') - - analysis_stage_logs_src_dir = os.path.join(current_analysis_stage_outdir, 'logs') - analysis_stage_logs_dest_dir = os.path.join(logs_dir, os.path.basename(current_analysis_stage_outdir)) - os.makedirs(analysis_stage_logs_dest_dir) - for log_file in os.listdir(analysis_stage_logs_src_dir): - src_path = os.path.join(analysis_stage_logs_src_dir, log_file) - dest_path = os.path.join(analysis_stage_logs_dest_dir, log_file) - shutil.copy(src_path, dest_path) - log.info(f'Published log file: {log_file} -> {dest_path}') - - log.info(f'Analysis stage complete: {current_analysis_stage}') - - - # - # Stage 5: Variant calling. - # - current_analysis_stage = 'variant_calling' - current_analysis_stage_index = analysis_stages.index(current_analysis_stage) - current_analysis_stage_outdir = os.path.join(args.outdir, 'analysis_by_stage', f'{current_analysis_stage_index:02}_{current_analysis_stage}') - current_analysis_stage_outdir = os.path.abspath(current_analysis_stage_outdir) - current_analysis_stage_inputs = { - 'mapping_refs': map_reads_analysis_summary['outputs']['mapping_refs'], - 'alignment': map_reads_analysis_summary['outputs']['alignment'], - 'alignment_index': map_reads_analysis_summary['outputs']['alignment_index'], - } - log.info(f'Beginning analysis stage: {current_analysis_stage}') - - variant_calling_params = { - 'variant_threshold_calling': args.variant_threshold_calling, - 'variant_threshold_masking': args.variant_threshold_masking, - 'min_depth': args.min_depth, - 'min_mapping_quality': args.min_mapping_quality, - 'coverage_limit': args.coverage_limit, - } - - call_variants_analysis_summary = analysis.call_variants( - current_analysis_stage_inputs, - current_analysis_stage_outdir, - args.output_name, - variant_calling_params, - ) - if call_variants_analysis_summary['return_code'] != 0: - log.error(f'Error in analysis stage: {current_analysis_stage}') - log.error(f'Error code: {call_variants_analysis_summary["return_code"]}') - exit(call_variants_analysis_summary['return_code']) - - # - # Publish outputs and logs - outputs_to_publish = { - 'variants_filtered': os.path.join(args.outdir), - } - for output_name, output_dir in outputs_to_publish.items(): - src_path = call_variants_analysis_summary['outputs'][output_name] - dest_path = os.path.join(output_dir, os.path.basename(src_path)) - shutil.copy(src_path, dest_path) - log.info(f'Published output: {output_name} -> {dest_path}') - - analysis_stage_logs_src_dir = os.path.join(current_analysis_stage_outdir, 'logs') - analysis_stage_logs_dest_dir = os.path.join(logs_dir, os.path.basename(current_analysis_stage_outdir)) - os.makedirs(analysis_stage_logs_dest_dir) - for log_file in os.listdir(analysis_stage_logs_src_dir): - src_path = os.path.join(analysis_stage_logs_src_dir, log_file) - dest_path = os.path.join(analysis_stage_logs_dest_dir, log_file) - shutil.copy(src_path, dest_path) - log.info(f'Published log file: {log_file} -> {dest_path}') - - log.info(f'Analysis stage complete: {current_analysis_stage}') - - # - # Stage 6: Consensus calling. - # - current_analysis_stage = 'consensus_calling' - current_analysis_stage_index = analysis_stages.index(current_analysis_stage) - current_analysis_stage_outdir = os.path.join(args.outdir, 'analysis_by_stage', f'{current_analysis_stage_index:02}_{current_analysis_stage}') - current_analysis_stage_outdir = os.path.abspath(current_analysis_stage_outdir) - current_analysis_stage_inputs = { - 'variants_filtered': call_variants_analysis_summary['outputs']['variants_filtered'], - 'mapping_refs': map_reads_analysis_summary['outputs']['mapping_refs'], - 'masked_positions': call_variants_analysis_summary['outputs']['masked_positions'], - } - log.info(f'Beginning analysis stage: {current_analysis_stage}') - - make_consensus_seqs_analysis_summary = analysis.make_consensus_seqs( - current_analysis_stage_inputs, - current_analysis_stage_outdir, - args.output_name, - ) - if make_consensus_seqs_analysis_summary['return_code'] != 0: - log.error(f'Error in analysis stage: {current_analysis_stage}') - log.error(f'Error code: {make_consensus_seqs_analysis_summary["return_code"]}') - exit(make_consensus_seqs_analysis_summary['return_code']) - - # - # Publish outputs and logs - outputs_to_publish = { - 'consensus_seqs': os.path.join(args.outdir), - } - for output_name, output_dir in outputs_to_publish.items(): - src_path = make_consensus_seqs_analysis_summary['outputs'][output_name] - dest_path = os.path.join(output_dir, os.path.basename(src_path)) - shutil.copy(src_path, dest_path) - log.info(f'Published output: {output_name} -> {dest_path}') - - analysis_stage_logs_src_dir = os.path.join(current_analysis_stage_outdir, 'logs') - analysis_stage_logs_dest_dir = os.path.join(logs_dir, os.path.basename(current_analysis_stage_outdir)) - os.makedirs(analysis_stage_logs_dest_dir) - for log_file in os.listdir(analysis_stage_logs_src_dir): - src_path = os.path.join(analysis_stage_logs_src_dir, log_file) - dest_path = os.path.join(analysis_stage_logs_dest_dir, log_file) - shutil.copy(src_path, dest_path) - log.info(f'Published log file: {log_file} -> {dest_path}') - - log.info(f'Analysis stage complete: {current_analysis_stage}') - - - # - # Stage 7: Summary reporting and plotting. - # - current_analysis_stage = 'summary_reporting' - current_analysis_stage_index = analysis_stages.index(current_analysis_stage) - current_analysis_stage_outdir = os.path.join(args.outdir, 'analysis_by_stage', f'{current_analysis_stage_index:02}_{current_analysis_stage}') - current_analysis_stage_outdir = os.path.abspath(current_analysis_stage_outdir) - current_analysis_stage_inputs = { - 'scaffolds': make_scaffold_seqs_analysis_summary['outputs']['scaffolds'], - 'mapping_refs': map_reads_analysis_summary['outputs']['mapping_refs'], - 'alignment': map_reads_analysis_summary['outputs']['alignment'], - 'depth_of_cov_freebayes': call_variants_analysis_summary['outputs']['depth_of_cov_freebayes'], - 'low_coverage_positions': call_variants_analysis_summary['outputs']['low_coverage_positions'], - 'ambiguous_positions': call_variants_analysis_summary['outputs']['ambiguous_positions'], - 'variant_positions': call_variants_analysis_summary['outputs']['variant_positions'], - 'consensus_seqs': make_consensus_seqs_analysis_summary['outputs']['consensus_seqs'], - } - log.info(f'Beginning analysis stage: {current_analysis_stage}') - - reporting_summary = report.write_report( - current_analysis_stage_inputs, - current_analysis_stage_outdir, - args.output_name, - ) - - plotting_summary = plots.make_plots( - current_analysis_stage_inputs, - current_analysis_stage_outdir, - args.output_name, - ) - - # - # Publish outputs and logs - outputs_to_publish = { - 'report': os.path.join(args.outdir), - 'depth_of_cov_plots': os.path.join(args.outdir), - } - for output_name, output_dir in outputs_to_publish.items(): - if output_name in reporting_summary['outputs']: - src_path = reporting_summary['outputs'][output_name] - elif output_name in plotting_summary['outputs']: - src_path = plotting_summary['outputs'][output_name] - dest_path = os.path.join(output_dir, os.path.basename(src_path)) - shutil.copy(src_path, dest_path) - log.info(f'Published output: {output_name} -> {dest_path}') - - analysis_stage_logs_src_dir = os.path.join(current_analysis_stage_outdir, 'logs') - analysis_stage_logs_dest_dir = os.path.join(logs_dir, os.path.basename(current_analysis_stage_outdir)) - os.makedirs(analysis_stage_logs_dest_dir) - for log_file in os.listdir(analysis_stage_logs_src_dir): - src_path = os.path.join(analysis_stage_logs_src_dir, log_file) - dest_path = os.path.join(analysis_stage_logs_dest_dir, log_file) - shutil.copy(src_path, dest_path) - log.info(f'Published log file: {log_file} -> {dest_path}') - - log.info(f'Analysis stage complete: {current_analysis_stage}') - - - if not args.disable_garbage_collection: - analysis_by_stage_dir = os.path.join(args.outdir, 'analysis_by_stage') - for stage in analysis_stages: - stage_index = analysis_stages.index(stage) - stage_outdir = os.path.join(analysis_by_stage_dir, f'{stage_index:02}_{stage}') - shutil.rmtree(stage_outdir) - log.info(f'Removed intermediate files for analysis stage: {stage}') - shutil.rmtree(analysis_by_stage_dir) - else: - log.info('Garbage collection disabled. Intermediate files were not removed.') - - log.info('Analysis complete.') - exit(0) - - -if __name__ == '__main__': - main() diff --git a/fluviewer/logging_config.py b/fluviewer/logging_config.py deleted file mode 100644 index 6851ca4..0000000 --- a/fluviewer/logging_config.py +++ /dev/null @@ -1,33 +0,0 @@ -import logging -import os -import sys - -def configure_logging(log_level, log_file): - """ - Configure logging to log to stderr and args.outdir/logs/fluviewer.log. - Should configure in such a way that all other modules can - call logging.getLogger(__name__) and have the same log settings. - - """ - logger = logging.getLogger() - logger.setLevel(log_level) - - # Use ISO 8601 format, milliseconds, no timezone - formatter = logging.Formatter( - fmt='%(asctime)s.%(msecs)03d - %(levelname)s %(module)s.%(funcName)s L%(lineno)d : %(message)s', - datefmt='%Y-%m-%dT%H:%M:%S' - ) - - stderr_handler = logging.StreamHandler(sys.stderr) - stderr_handler.setFormatter(formatter) - logger.addHandler(stderr_handler) - - logs_dir = os.path.join(os.path.dirname(log_file)) - if not os.path.exists(logs_dir): - os.makedirs(logs_dir) - - fluviewer_log_file_path = os.path.join(logs_dir, f'fluviewer.log') - log_file_handler = logging.FileHandler(fluviewer_log_file_path) - log_file_handler.setFormatter(formatter) - logger.addHandler(log_file_handler) - diff --git a/fluviewer/parsers.py b/fluviewer/parsers.py deleted file mode 100644 index fa0ada2..0000000 --- a/fluviewer/parsers.py +++ /dev/null @@ -1,242 +0,0 @@ -import os -import json -import logging - -log = logging.getLogger(__name__) - -def convert_cell_to_int(cell): - """ - Cells are reported with a "K", "M", or "G" suffix. This function - converts the cell size to an integer. - - :param cell: The cell size. - :type cell: str - :return: The cell size as an integer. - :rtype: int - """ - cell_suffix = cell[-1] - cell_size = float(cell[:-1]) - if cell_suffix == "K": - cell_size = cell_size * 1000 - elif cell_suffix == "M": - cell_size = cell_size * 1000000 - elif cell_suffix == "B": - cell_size = cell_size * 1000000000 - - cell_size_int = round(cell_size) - - return cell_size_int - - -def parse_bbnorm_log(bbnorm_log_path): - """ - Parse the log file generated by BBnorm. - - """ - int_fields = [ - 'threads', - 'k', - 'passes', - 'bits_per_cell', - 'hashes', - 'base_min_quality', - 'target_depth', - 'min_depth', - 'max_depth', - 'min_good_kmers', - 'estimated_unique_kmers', - 'total_kmers_counted', - 'total_unique_kmer_count', - ] - float_fields = [ - 'kmer_min_prob', - 'depth_percentile', - 'corrected_depth_average', - 'approx_read_depth_median', - ] - bool_fields = [ - 'deterministic', - 'toss_error_reads', - 'ignore_dupe_kmers', - 'fix_spikes', - ] - percent_fields = [ - 'percent_unique', - ] - value_with_percent_fields = [ - 'total_reads_in', - 'total_bases_in', - 'error_reads_in', - 'error_pairs_in', - 'error_type_1', - 'error_type_2', - 'error_type_3', - ] - seconds_fields = [ - 'table_creatiion_time', - ] - seconds_with_rate_fields = [ - 'table_read_time', - ] - - unique_or_all_kmers_fields = [ - 'depth_average', - 'depth_median', - 'depth_standard_deviation', - 'corrected_depth_average', - ] - pass_num_key = 'pass_0' - parsed_bbnorm_log = { - pass_num_key: {} - } - with open(bbnorm_log_path, 'r') as f: - for line in f: - bbnorm_key = None - bbnorm_value = None - line = line.strip() - if "Pass" in line: - line_split = line.split() - try: - pass_num = int(line_split[2]) - except ValueError as e: - log.error(f"Could not parse pass number number from bbnrorm log line: {line}") - raise e - pass_num_key = 'pass_' + str(pass_num) - parsed_bbnorm_log[pass_num_key] = {} - - if line.startswith("Settings:"): - continue - if line.startswith("Started output threads."): - continue - if line.startswith("Removing temp files."): - continue - if line.startswith("Total time:"): - time_str = line.split(":")[1].strip() - time_str_split = time_str.split() - time_seconds = float(time_str_split[0]) - time_units = time_str_split[1] - parsed_bbnorm_log['total_time_seconds'] = time_seconds - rate_str = time_str_split[2] - rate = float(rate_str) - rate_units = time_str_split[3] - rate_units = rate_units.replace("/", "_per_") - parsed_bbnorm_log['total_rate_' + rate_units] = rate - - continue - - if not line.startswith("Made hash table:"): - line_split = line.split(":") - if len(line_split) != 2: - continue - - bbnorm_key = line_split[0].lower().replace(" ", "_") - - bbnorm_value_str = line_split[1].strip() - if bbnorm_key in int_fields: - try: - bbnorm_value = int(bbnorm_value_str) - except ValueError as e: - log.error(f"Could not parse integer value from bbnorm log line: {line}") - bborm_value = None - elif bbnorm_key in float_fields: - try: - bbnorm_value = float(bbnorm_value_str) - except ValueError as e: - log.error(f"Could not parse float value from bbnorm log line: {line}") - bbnorm_value = None - elif bbnorm_key in bool_fields: - try: - bbnorm_value = bool(bbnorm_value_str) - except ValueError as e: - log.error(f"Could not parse boolean value from bbnorm log line: {line}") - bbnorm_value = None - elif bbnorm_key in percent_fields: - bbnorm_value = float(bbnorm_value_str.split("%")[0]) - parsed_bbnorm_log[pass_num_key][bbnorm_key] = bbnorm_value - elif bbnorm_key == "cells": - bbnorm_value = convert_cell_to_int(bbnorm_value_str) - elif bbnorm_key == "table_creation_time": - bbnorm_key = "table_creation_time_seconds" - bbnorm_value = float(bbnorm_value_str.split(" ")[0]) - elif bbnorm_key in value_with_percent_fields: - bbnorm_value_int_str = bbnorm_value_str.split(" ")[0] - bbnorm_value_int = int(bbnorm_value_int_str) - parsed_bbnorm_log[pass_num_key][bbnorm_key] = bbnorm_value_int - bbnorm_value_percent_str = bbnorm_value_str.split()[1] - bbnorm_value_percent = float(bbnorm_value_percent_str.split("%")[0]) - if len(bbnorm_value_str.split()) == 3 and bbnorm_key.endswith("_in"): - bbnorm_key = bbnorm_key.strip("_in") + "_kept_percent" - parsed_bbnorm_log[pass_num_key][bbnorm_key] = bbnorm_value_percent - else: - bbnorm_key = bbnorm_key + "_percent" - parsed_bbnorm_log[pass_num_key][bbnorm_key] = bbnorm_value_percent - elif bbnorm_key == "approx._read_depth_median": - bbnorm_key = "approx_read_depth_median" - bbnorm_value = float(bbnorm_value_str) - parsed_bbnorm_log[pass_num_key][bbnorm_key] = bbnorm_value - elif bbnorm_key in seconds_fields: - bbnorm_seconds_float_str = bbnorm_value_str.split(" ")[0] - bbnorm_seconds = float(bbnorm_seconds_float_str) - parsed_bbnorm_log[pass_num_key][bbnorm_key + "_seconds"] = bbnorm_seconds - elif bbnorm_key in seconds_with_rate_fields: - bbnorm_seconds_float_str = bbnorm_value_str.split(" ")[0] - bbnorm_seconds = float(bbnorm_seconds_float_str) - parsed_bbnorm_log[pass_num_key][bbnorm_key + "_seconds"] = bbnorm_seconds - - bbnorm_rate_str = bbnorm_value_str.split()[2] - bbnorm_rate = float(bbnorm_rate_str) - - rate_units = bbnorm_value_str.split()[3] - rate_units = rate_units.replace("/", "_per_") - bbnorm_key = bbnorm_key + "_" + rate_units - parsed_bbnorm_log[pass_num_key][bbnorm_key] = bbnorm_rate - elif bbnorm_key in unique_or_all_kmers_fields: - bbnorm_value_str_split = bbnorm_value_str.split() - bbnorm_value = float(bbnorm_value_str_split[0]) - unique_or_all_kmers = "_".join([ - bbnorm_value_str_split[1].replace("(", ""), - bbnorm_value_str_split[2].replace(")", "") - ]) - bbnorm_key = bbnorm_key + "_" + unique_or_all_kmers - parsed_bbnorm_log[pass_num_key][bbnorm_key] = bbnorm_value - - else: - bbnorm_value = bbnorm_value_str - parsed_bbnorm_log[pass_num_key][bbnorm_key] = bbnorm_value - - elif line.startswith("Made hash table:"): - hash_table = {} - line_split = line.split(":") - bbnorm_key = line_split[0].lower().replace(" ", "_") - try: - hash_table_str = line_split[1].strip() - # split on multiple spaces - hash_table_str_split = hash_table_str.split(" ") - for hash_table_metric_str in hash_table_str_split: - hash_table_metric_split = hash_table_metric_str.split("=") - hash_table_metric_key = hash_table_metric_split[0].strip().lower().replace(" ", "_") - hash_table_metric_value_str = hash_table_metric_split[1].strip() - if hash_table_metric_key == 'hashes': - hash_table_metric_value = int(hash_table_metric_value_str) - elif hash_table_metric_key == 'mem': - hash_table_metric_value = float(hash_table_metric_value_str.split(" ")[0]) - hash_table_metric_units = hash_table_metric_value_str.split(" ")[1] - hash_table_metric_key = hash_table_metric_key + "_" + hash_table_metric_units - elif hash_table_metric_key == 'cells': - hash_table_metric_value = convert_cell_to_int(hash_table_metric_value_str) - elif hash_table_metric_key == 'used': - hash_table_metric_key = 'percent_used_memory' - hash_table_metric_value = float(hash_table_metric_value_str.split("%")[0]) - else: - hash_table_metric_value = hash_table_metric_value_str - - hash_table[hash_table_metric_key] = hash_table_metric_value - - except IndexError as e: - log.error(f"Could not parse hash table from bbnorm log line: {line}") - raise e - parsed_bbnorm_log[pass_num_key]['hash_table'] = hash_table - - parsed_bbnorm_log.pop('pass_0') - - return parsed_bbnorm_log diff --git a/fluviewer/plots.py b/fluviewer/plots.py deleted file mode 100644 index 3527d7d..0000000 --- a/fluviewer/plots.py +++ /dev/null @@ -1,122 +0,0 @@ -import logging -import os -import datetime - -from math import ceil, log10 - -import pandas as pd -import seaborn as sb -import matplotlib.pyplot as plt - -from fluviewer.analysis import run - - -log = logging.getLogger(__name__) - -def make_plots(inputs, outdir, out_name): - """ - Generate depth of coverage plots for each segment. - - :param inputs: dictionary of input files. - :type inputs: dict - :param outdir: Path to the output directory. - :type outdir: str - :param out_name: Name used for outputs. - :type out_name: str - """ - log.info('Making depth of coverage plots...') - log_dir = os.path.join(outdir, out_name) - if not os.path.exists(log_dir): - os.makedirs(log_dir) - - timestamp_analysis_start = datetime.datetime.now().isoformat() - analysis_summary = { - 'timestamp_analysis_start': timestamp_analysis_start, - 'inputs': inputs, - } - - # Get depth of coverage using samtools. - filtered_alignment = inputs['alignment'] - depth_of_cov = os.path.join(outdir, f'{out_name}_depth_of_cov_samtools.tsv') - terminal_command = (f'samtools depth {filtered_alignment} > {depth_of_cov}') - process_name = 'samtools_depth' - error_code = 24 - return_code = run(terminal_command, outdir, out_name, process_name, error_code) - if return_code != 0: - log.error('Error running samtools depth.') - - cols = ['seq_name', 'position', 'depth_samtools'] - samtools_data = pd.read_csv(depth_of_cov, sep='\t', names=cols) - - # Get depth of coverage from FreeBayes. - depth_of_cov = inputs['depth_of_cov_freebayes'] - cols = ['seq_name', 'position', 'depth_freebayes'] - freebayes_data = pd.read_csv(depth_of_cov, sep='\t', names=cols) - - # Merge samtools and freebayes data. - data = pd.merge(samtools_data, freebayes_data, on=['seq_name', 'position'], how='left') - - # Annotate with segment. - get_segment = lambda row: row['seq_name'].split('|')[1] - data['segment'] = data.apply(get_segment, axis=1) - - # Make plots. - sb.set_style('whitegrid') - segments = data['segment'].unique() - fig_size = (8.5, 2 * len(segments)) - fig, axs = plt.subplots(len(segments), 1, sharex=True, figsize=fig_size) - max_position = data['position'].max() - x_ticks = [100 * i for i in range(1, ceil(max_position / 100))] - max_depth = max([data['depth_samtools'].max(), data['depth_freebayes'].max()]) - y_max = 10 ** ceil(log10(max_depth)) - y_ticks = [10 ** i for i in range(ceil(log10(max_depth)))] - for segment, ax in zip(segments, axs): - segment_data = data[data['segment']==segment] - sb.lineplot(x='position', y='depth_samtools', ax=ax, data=segment_data, - color='grey') - sb.lineplot(x='position', y='depth_freebayes', ax=ax, data=segment_data, - color='black') - ax.set_xlim(1, max_position) - ax.set_xlabel('Position') - ax.set_xticks(x_ticks) - if ax == axs[-1]: - ax.set_xticklabels(ax.get_xticklabels(), rotation=45, ha='right') - else: - ax.set_xticklabels([]) - ax.set_ylim(1, y_max) - ax.set_ylabel('Read depth') - ax.set_yscale('log') - ax.set_yticks(y_ticks) - ax.set_title(segment) - ax.axvspan(segment_data['position'].max(), max_position, color='grey') - low_cov_path = inputs['low_coverage_positions'] - with open(low_cov_path, 'r') as input_file: - for line in input_file: - seq_name, start, stop = line.strip().split('\t') - if seq_name.split('|')[1] == segment: - for position in range(int(start), int(stop) + 1): - ax.axvline(position, color='red', alpha = 0.5) - ambig_path = inputs['ambiguous_positions'] - with open(ambig_path, 'r') as input_file: - for line in input_file: - seq_name, position = line.strip().split('\t') - if seq_name.split('|')[1] == segment: - ax.axvline(int(position), color='orange', alpha = 0.5) - variant_positions_path = inputs['variant_positions'] - with open(variant_positions_path, 'r') as input_file: - for line in input_file: - seq_name, position = line.strip().split('\t') - if seq_name.split('|')[1] == segment: - ax.axvline(int(position), color='blue', alpha = 0.5) - plt.tight_layout() - plots = os.path.join(outdir, f'{out_name}_depth_of_cov.png') - plt.savefig(plots, dpi=400) - plt.close() - - outputs = {'depth_of_cov_plots': plots} - analysis_summary['outputs'] = outputs - analysis_summary['timestamp_analysis_complete'] = datetime.datetime.now().isoformat() - - log.info(f'Depth of coverage plots saved to {plots}') - - return analysis_summary diff --git a/fluviewer/report.py b/fluviewer/report.py deleted file mode 100644 index ced5684..0000000 --- a/fluviewer/report.py +++ /dev/null @@ -1,134 +0,0 @@ -import logging -import os -import datetime - -import pandas as pd -import numpy as np - -from fluviewer.analysis import run - - -log = logging.getLogger(__name__) - - -def write_report(inputs, outdir, out_name): - """ - Generate a report for each segment. - - :param inputs: dictionary of input files. - :type inputs: list - :param outdir: Path to the output directory. - :type outdir: Path - :param out_name: Name used for outputs. - :type out_name: str - """ - log.info('Writing report...') - log_dir = os.path.join(outdir, out_name) - if not os.path.exists(log_dir): - os.makedirs(log_dir) - - timestamp_analysis_start = datetime.datetime.now().isoformat() - analysis_summary = { - 'timestamp_analysis_start': timestamp_analysis_start, - 'inputs': inputs, - } - - outputs = {} - - # Count reads mapped to each segment and add to report. - filtered_alignment = inputs['alignment'] - reads_mapped = os.path.join(outdir, f'{out_name}_reads_mapped.tsv') - terminal_command = (f'samtools idxstats {filtered_alignment} > ' - f'{reads_mapped}') - process_name = 'samtools_idxstats' - error_code = 23 - return_code = run(terminal_command, outdir, out_name, process_name, error_code) - if return_code != 0: - log.error('Error running samtools idxstats.') - analysis_summary['error'] = 'Error running samtools idxstats.' - analysis_summary['return_code'] = error_code - return analysis_summary - - cols = 'seq_name seq_length reads_mapped reads_unmapped'.split(' ') - reads_mapped = pd.read_csv(reads_mapped, sep='\t', names=cols) - reads_mapped = reads_mapped.replace('*', np.nan).dropna() - get_seq_name = lambda row: '|'.join(row['seq_name'].split('|')[:3]) - reads_mapped['seq_name'] = reads_mapped.apply(get_seq_name, axis=1) - cols = ['seq_name', 'reads_mapped', 'seq_length'] - report = reads_mapped[cols].drop_duplicates() - - # Annotate segment and subtype. - get_segment = lambda row: row['seq_name'].split('|')[1] - report['segment'] = report.apply(get_segment, axis=1) - get_subtype = lambda row: row['seq_name'].split('|')[2] - report['subtype'] = report.apply(get_subtype, axis=1) - - #Add scaffold completeness to report. - seqs = {} - scaffolds = inputs['scaffolds'] - with open(scaffolds, 'r') as input_file: - for line in input_file: - if line[0] == '>': - seq_name = line[1:].strip() - seqs[seq_name] = '' - else: - seqs[seq_name] += line.strip() - completeness = {} - for seq_name, seq in seqs.items(): - segment = seq_name.split('|')[1].split('_')[0] - perc = sum(seq.count(base) for base in 'ATGC') * 100 / len(seq) - perc = round(perc, 2) - completeness[segment] = perc - report['scaffold_completeness'] = report['segment'].map(completeness) - - # Add consensus completeness to report. - seqs = {} - consensus_seqs = inputs['consensus_seqs'] - with open(consensus_seqs, 'r') as input_file: - for line in input_file: - if line[0] == '>': - seq_name = line[1:].strip() - seqs[seq_name] = '' - else: - seqs[seq_name] += line.strip() - completeness = {} - for seq_name, seq in seqs.items(): - segment = seq_name.split('|')[1] - perc = sum(seq.count(base) for base in 'ATGC') * 100 / len(seq) - perc = round(perc, 2) - completeness[segment] = perc - report['consensus_completeness'] = report['segment'].map(completeness) - - # Add best ref seq to report. - ref_seqs_used = {} - mapping_refs = inputs['mapping_refs'] - with open(mapping_refs, 'r') as input_file: - for line in input_file: - if line[0] == '>': - line = line[1:].strip().split('|') - seq_name, segment, subtype = line[:3] - accession, ref_name, ref_subtype = line[3:] - seq_name = f'{seq_name}|{segment}|{subtype}' - ref_seqs_used[seq_name] = (f'{accession}|{ref_name}' - f'({ref_subtype})') - report['ref_seq_used'] = report['seq_name'].map(ref_seqs_used) - - # Write report to TSV file. - segment_order ='PB2 PB1 PA HA NP NA M NS'.split(' ') - get_sort_value = lambda row: segment_order.index(row['segment']) - report['sort'] = report.apply(get_sort_value, axis=1) - report = report.sort_values(by='sort') - cols = ['seq_name', 'segment', 'subtype', 'reads_mapped', 'seq_length', - 'scaffold_completeness', 'consensus_completeness', 'ref_seq_used'] - report = report[cols] - report_path = os.path.join(outdir, f'{out_name}_report.tsv') - report.to_csv(report_path, index=False, sep='\t') - - outputs['report'] = report_path - - analysis_summary['outputs'] = outputs - timestamp_analysis_complete = datetime.datetime.now().isoformat() - analysis_summary['timestamp_analysis_complete'] = timestamp_analysis_complete - - return analysis_summary - diff --git a/setup.py b/setup.py deleted file mode 100644 index 6d20cce..0000000 --- a/setup.py +++ /dev/null @@ -1,28 +0,0 @@ -from setuptools import setup, find_namespace_packages - -import fluviewer - -setup( - name='fluviewer', - version=fluviewer.__version__, - packages=find_namespace_packages(), - entry_points={ - "console_scripts": [ - "FluViewer = fluviewer.fluviewer:main", - "fluviewer = fluviewer.fluviewer:main", - ] - }, - scripts=[], - package_data={ - }, - python_requires='>=3.7', - install_requires=[ - ], - description='', - url='https://github.com/BCCDC-PHL/FluViewer', - author='Kevin Kuchinski', - author_email='', - include_package_data=True, - keywords=[], - zip_safe=False -)