Skip to content

Commit

Permalink
Make bbnorm normalization optional (#40)
Browse files Browse the repository at this point in the history
* Start work on making bbnorm normalization optional

* begin implementing skipping read normalization

* work on implementing skipping depth normalization

* Fix workflow diagram

* update workflow diagram
  • Loading branch information
dfornika authored Jul 11, 2024
1 parent a7efe7d commit 7876c50
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 90 deletions.
26 changes: 16 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ This codebase is derived from [KevinKuchinski/FluViewer](https://github.com/Kevi

## Analysis Stages

0. **Read Normalization**: The provided reads are normalized and downsampled using a kmer-based approach from [bbmap](https://sourceforge.net/projects/bbmap) called `bbnorm`. This reduces any excessive coverage of certain genome regions.
0. **Read Normalization**: (Optional) The provided reads are normalized and downsampled using a kmer-based approach from [bbmap](https://sourceforge.net/projects/bbmap) called `bbnorm`. This reduces any excessive coverage of certain genome regions.

1. **Assemble Contigs**: The normalized/downsampled reads are assembled de novo into contigs with the [spades](https://github.com/ablab/spades) assembler.

Expand Down Expand Up @@ -54,17 +54,17 @@ Each stage selects its inputs from the `outputs` of the previous stages's analys

```mermaid
flowchart TD
forward_reads[Forward Reads] -- input_reads_fwd --> normalization(Read Normalization)
forward_reads[Forward Reads] -- input_reads_fwd --> normalization("Read Normalization (Optional)")
reverse_reads[Reverse Reads] -- input_reads_rev --> normalization
normalization -- normalized_reads_fwd --> assemble_contigs(Assemble Contigs)
normalization -- normalized_reads_rev --> assemble_contigs
normalization -- reads_fwd --> assemble_contigs(Assemble Contigs)
normalization -- reads_rev --> assemble_contigs
fluviewer_db[FluViewer DB] --> blast_contigs(BLAST Contigs)
assemble_contigs -- contigs --> blast_contigs
blast_contigs -- filtered_contig_blast_results --> scaffolding(Scaffolding)
fluviewer_db --> scaffolding
scaffolding -- filtered_scaffold_blast_results --> read_mapping(Read Mapping)
normalization -- normalized_reads_fwd --> read_mapping
normalization -- normalized_reads_rev --> read_mapping
normalization -- reads_fwd --> read_mapping
normalization -- reads_rev --> read_mapping
fluviewer_db --> read_mapping
read_mapping -- mapping_refs --> variant_calling(Variant Calling)
read_mapping -- alignment --> variant_calling
Expand Down Expand Up @@ -101,8 +101,8 @@ Custom DBs can be created and used as well (instructions below).
## Usage

```
usage: fluviewer [-h] -f FORWARD_READS -r REVERSE_READS -d DB [-o OUTDIR] -n OUTPUT_NAME [-i [0-100]] [-l [32-]] [-D [1-]] [-q [0-]] [-v [0-1]] [-V [0-1]] [-N [1-]] [-L [1-]] [-t [1-]] [-M [1-]] [-g] [--force]
[--log-level {info,debug}] [--version]
usage: fluviewer [-h] -f FORWARD_READS -r REVERSE_READS -d DB [-o OUTDIR] -n OUTPUT_NAME [-i [0-100]] [-l [32-]] [-D [1-]] [-q [0-]] [-v [0-1]] [-V [0-1]] [-N [1-]] [-L [1-]] [-t [1-]] [-M [1-]] [-g] [--skip-depth-normalization]
[--force] [--log-level {info,debug}] [--version]
BCCDC-PHL/FluViewer: Influenza A virus consensus sequence generation and variant calling
Expand All @@ -112,8 +112,7 @@ optional arguments:
Path to FASTQ file containing forward reads
-r REVERSE_READS, --reverse-reads REVERSE_READS
Path to FASTQ file containing reverse reads
-d DATABASE, --db DATABASE
Path to FASTA file containing FluViewer database
-d DB, --db DB Path to FASTA file containing FluViewer database
-o OUTDIR, --outdir OUTDIR
Output directory (default=FluViewer_<output-name>)
-n OUTPUT_NAME, --output-name OUTPUT_NAME
Expand All @@ -140,12 +139,19 @@ optional arguments:
Gigabytes of memory allocated for normalizing reads (default=max)
-g, --disable-garbage-collection
Disable garbage collection and retain intermediate analysis files
--skip-depth-normalization
Skip read depth normalization (bbnorm) stage.
--force Allow overwrite of existing files and directories.
--log-level {info,debug}
Log level (default=info)
--version show program's version number and exit
```

### Depth Normalization

Depending on the library preparation method used, some libraries get much higher depth of coverage near the
ends of each segment.
The `normalize_depth` stage is intended to normalize the depth-of-coverage across each segment to a consistent level, but this stage is optional. If you would like to skip the `normalize_depth` stage, simply add the `--skip-depth-normalization` flag. If that flag is used, the `assemble_contigs` stage will start directly with the set of reads supplied with the `--forward-reads` (`-f`) and `--reverse-reads` (`-r`) flags.

## FluViewer Database

Expand Down
15 changes: 7 additions & 8 deletions fluviewer/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,8 @@ def assemble_contigs(
os.makedirs(logs_dir, exist_ok=True)

spades_output = os.path.join(outdir, 'spades_output')
fwd_reads = inputs.get('normalized_reads_fwd', None)
rev_reads = inputs.get('normalized_reads_rev', None)
fwd_reads = inputs.get('reads_fwd', None)
rev_reads = inputs.get('reads_rev', None)

os.makedirs(spades_output, exist_ok=True)

Expand Down Expand Up @@ -1128,11 +1128,10 @@ def make_map_ref(data_frame):

def map_reads(inputs, outdir, out_name, min_qual):
"""
Normalized, downsampled reads (from normalize_depth analysis stage) are mapped to the
mapping references (produced by make_mapping_refs func) using BWA mem. The alignment
is filtered to retain only paired reads, then sorted and indexed.
Reads are mapped to the mapping references (produced by make_mapping_refs func) using BWA mem.
The alignment is filtered to retain only paired reads, then sorted and indexed.
:param inputs: Dictionary of input files, with keys 'normalized_reads_fwd', 'normalized_reads_rev' and 'mapping_refs'.
:param inputs: Dictionary of input files, with keys: ['reads_fwd', 'reads_rev' and 'mapping_refs'].
:type inputs: dict
:param outdir: Path to the output directory.
:type outdir: Path
Expand Down Expand Up @@ -1163,8 +1162,8 @@ def map_reads(inputs, outdir, out_name, min_qual):
analysis_summary['error_message'] = error_messages_by_code[error_code]
return analysis_summary

fwd_reads = inputs.get('normalized_reads_fwd', None)
rev_reads = inputs.get('normalized_reads_rev', None)
fwd_reads = inputs.get('reads_fwd', None)
rev_reads = inputs.get('reads_rev', None)
alignment_path = os.path.join(outdir, f'{out_name}_alignment.sam')
terminal_command = (f'bwa mem {mapping_refs_path} {fwd_reads} {rev_reads} '
f'> {alignment_path}')
Expand Down
33 changes: 17 additions & 16 deletions fluviewer/cli_args.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,23 @@ def parse_args():
:rtype: dict
"""
parser = argparse.ArgumentParser(description='BCCDC-PHL/FluViewer: Influenza A virus consensus sequence generation and variant calling')
parser.add_argument('-f', '--forward-reads', type=Path, required=True, help='Path to FASTQ file containing forward reads')
parser.add_argument('-r', '--reverse-reads', type=Path, required=True, help='Path to FASTQ file containing reverse reads')
parser.add_argument('-d', '--db', type=Path, required=True, help='Path to FASTA file containing FluViewer database')
parser.add_argument('-o', '--outdir', type=Path, help='Output directory (default=FluViewer_<output-name>)')
parser.add_argument('-n', '--output-name', type=str, required=True, help='Output name. Includes this name in output files, and in consensus sequence headers')
parser.add_argument('-i', '--min-identity', type=float, default=90, metavar="[0-100]", help='Minimum percent sequence identity between database reference sequences and contigs (default=90)')
parser.add_argument('-l', '--min-alignment-length', type=int, default=50, metavar="[32-]", help='Minimum length of alignment between database reference sequences and contigs (default=50)')
parser.add_argument('-D', '--min-depth', type=int, default=20, metavar="[1-]", help='Minimum read depth for base calling (default=20)')
parser.add_argument('-q', '--min-mapping-quality', type=int, default=20, metavar="[0-]", help='Minimum PHRED score for mapping quality and base quality during variant calling (default=20)')
parser.add_argument('-v', '--variant-threshold-calling', type=float, default=0.75, metavar="[0-1]", help='Variant allele fraction threshold for calling variants (default=0.75)')
parser.add_argument('-V', '--variant-threshold-masking', type=float, default=0.25, metavar="[0-1]", help='Variant allele fraction threshold for masking ambiguous variants (default=0.25)')
parser.add_argument('-N', '--target-depth', type=int, default=200, metavar="[1-]", help='Target depth for pre-normalization of reads (default=200)')
parser.add_argument('-L', '--coverage-limit', type=int, default=200, metavar="[1-]", help='Coverage depth limit for variant calling (default=200)')
parser.add_argument('-t', '--threads', type=int, default=1, metavar="[1-]", help='Threads used for contig/scaffold alignments (default=1)')
parser.add_argument('-M', '--max-memory', type=int, metavar="[1-]", help='Gigabytes of memory allocated for normalizing reads (default=max)')
parser.add_argument('-g', '--disable-garbage-collection', action='store_true', help='Disable garbage collection and retain intermediate analysis files')
parser.add_argument('-f', '--forward-reads', type=Path, required=True, help='Path to FASTQ file containing forward reads.')
parser.add_argument('-r', '--reverse-reads', type=Path, required=True, help='Path to FASTQ file containing reverse reads.')
parser.add_argument('-d', '--db', type=Path, required=True, help='Path to FASTA file containing FluViewer database.')
parser.add_argument('-o', '--outdir', type=Path, help='Output directory. (default=FluViewer_<output-name>)')
parser.add_argument('-n', '--output-name', type=str, required=True, help='Output name. Includes this name in output files, and in consensus sequence headers.')
parser.add_argument('-i', '--min-identity', type=float, default=90, metavar="[0-100]", help='Minimum percent sequence identity between database reference sequences and contigs. (default=90)')
parser.add_argument('-l', '--min-alignment-length', type=int, default=50, metavar="[32-]", help='Minimum length of alignment between database reference sequences and contigs. (default=50)')
parser.add_argument('-D', '--min-depth', type=int, default=20, metavar="[1-]", help='Minimum read depth for base calling. (default=20)')
parser.add_argument('-q', '--min-mapping-quality', type=int, default=20, metavar="[0-]", help='Minimum PHRED score for mapping quality and base quality during variant calling. (default=20)')
parser.add_argument('-v', '--variant-threshold-calling', type=float, default=0.75, metavar="[0-1]", help='Variant allele fraction threshold for calling variants. (default=0.75)')
parser.add_argument('-V', '--variant-threshold-masking', type=float, default=0.25, metavar="[0-1]", help='Variant allele fraction threshold for masking ambiguous variants. (default=0.25)')
parser.add_argument('-N', '--target-depth', type=int, default=200, metavar="[1-]", help='Target depth for pre-normalization of reads. (default=200)')
parser.add_argument('-L', '--coverage-limit', type=int, default=200, metavar="[1-]", help='Coverage depth limit for variant calling. (default=200)')
parser.add_argument('-t', '--threads', type=int, default=1, metavar="[1-]", help='Threads used for contig/scaffold alignments. (default=1)')
parser.add_argument('-M', '--max-memory', type=int, metavar="[1-]", help='Gigabytes of memory allocated for normalizing reads. (default=max)')
parser.add_argument('-g', '--disable-garbage-collection', action='store_true', help='Disable garbage collection and retain intermediate analysis files.')
parser.add_argument('--skip-depth-normalization', action='store_true', help='Skip read depth normalization (bbnorm) stage.')
parser.add_argument('--force', action='store_true', help='Allow overwrite of existing files and directories.')
parser.add_argument('--log-level', default='info', choices=['info', 'debug'], help='Log level (default=info)')
parser.add_argument('--version', action='version', version=__version__)
Expand Down
136 changes: 80 additions & 56 deletions fluviewer/fluviewer.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,12 @@ def main():
args.output_name,
)

analysis_stages = [
'normalize_depth',
if args.skip_depth_normalization:
analysis_stages = []
else:
analysis_stages = ['normalize_depth']

analysis_stages += [
'assemble_contigs',
'blast_contigs',
'scaffolding',
Expand All @@ -97,67 +101,77 @@ def main():
log.info('Starting analysis...')


#
# Stage 0: Normalize depth of reads.
#
current_analysis_stage = 'normalize_depth'
current_analysis_stage_index = analysis_stages.index(current_analysis_stage)
current_analysis_stage_outdir = os.path.join(args.outdir, 'analysis_by_stage', f'{current_analysis_stage_index:02}_{current_analysis_stage}')
current_analysis_stage_outdir = os.path.abspath(current_analysis_stage_outdir)
log.info(f'Beginning analysis stage: {current_analysis_stage}')
log.info(f'Output directory: {current_analysis_stage_outdir}')

current_analysis_stage_inputs = {
'input_reads_fwd': os.path.abspath(args.forward_reads),
'input_reads_rev': os.path.abspath(args.reverse_reads),
}

normalize_depth_analysis_summary = analysis.normalize_depth(
current_analysis_stage_inputs,
current_analysis_stage_outdir,
args.output_name,
os.path.abspath(args.forward_reads),
os.path.abspath(args.reverse_reads),
args.target_depth,
args.max_memory,
)
if normalize_depth_analysis_summary['return_code'] != 0:
log.error(f'Error in analysis stage: {current_analysis_stage}')
log.error(f'Error code: {normalize_depth_analysis_summary["return_code"]}')
exit(normalize_depth_analysis_summary['return_code'])

#
# Publish outputs and logs
outputs_to_publish = {
}
for output_name, output_dir in outputs_to_publish.items():
src_path = normalize_depth_analysis_summary['outputs'][output_name]
dest_path = os.path.join(output_dir, os.path.basename(src_path))
shutil.copy(src_path, dest_path)
log.info(f'Published output: {output_name} -> {dest_path}')
if args.skip_depth_normalization:
log.info('Skipping analysis stage: normalize_depth')
else:
#
# Stage 0: Normalize depth of reads.
#
current_analysis_stage = 'normalize_depth'
current_analysis_stage_index = analysis_stages.index(current_analysis_stage)
current_analysis_stage_outdir = os.path.join(args.outdir, 'analysis_by_stage', f'{current_analysis_stage_index:02}_{current_analysis_stage}')
current_analysis_stage_outdir = os.path.abspath(current_analysis_stage_outdir)
log.info(f'Beginning analysis stage: {current_analysis_stage}')
log.info(f'Output directory: {current_analysis_stage_outdir}')

current_analysis_stage_inputs = {
'input_reads_fwd': os.path.abspath(args.forward_reads),
'input_reads_rev': os.path.abspath(args.reverse_reads),
}

normalize_depth_analysis_summary = analysis.normalize_depth(
current_analysis_stage_inputs,
current_analysis_stage_outdir,
args.output_name,
os.path.abspath(args.forward_reads),
os.path.abspath(args.reverse_reads),
args.target_depth,
args.max_memory,
)
if normalize_depth_analysis_summary['return_code'] != 0:
log.error(f'Error in analysis stage: {current_analysis_stage}')
log.error(f'Error code: {normalize_depth_analysis_summary["return_code"]}')
exit(normalize_depth_analysis_summary['return_code'])

#
# Publish outputs and logs
outputs_to_publish = {
}
for output_name, output_dir in outputs_to_publish.items():
src_path = normalize_depth_analysis_summary['outputs'][output_name]
dest_path = os.path.join(output_dir, os.path.basename(src_path))
shutil.copy(src_path, dest_path)
log.info(f'Published output: {output_name} -> {dest_path}')

analysis_stage_logs_src_dir = os.path.join(current_analysis_stage_outdir, 'logs')
analysis_stage_logs_dest_dir = os.path.join(logs_dir, os.path.basename(current_analysis_stage_outdir))
os.makedirs(analysis_stage_logs_dest_dir)
for log_file in os.listdir(analysis_stage_logs_src_dir):
src_path = os.path.join(analysis_stage_logs_src_dir, log_file)
dest_path = os.path.join(analysis_stage_logs_dest_dir, log_file)
shutil.copy(src_path, dest_path)
log.info(f'Published log file: {log_file} -> {dest_path}')
analysis_stage_logs_src_dir = os.path.join(current_analysis_stage_outdir, 'logs')
analysis_stage_logs_dest_dir = os.path.join(logs_dir, os.path.basename(current_analysis_stage_outdir))
os.makedirs(analysis_stage_logs_dest_dir)
for log_file in os.listdir(analysis_stage_logs_src_dir):
src_path = os.path.join(analysis_stage_logs_src_dir, log_file)
dest_path = os.path.join(analysis_stage_logs_dest_dir, log_file)
shutil.copy(src_path, dest_path)
log.info(f'Published log file: {log_file} -> {dest_path}')

log.info(f'Analysis stage complete: {current_analysis_stage}')
log.info(f'Analysis stage complete: {current_analysis_stage}')


#
# Stage 1: Assemble contigs.
#
current_analysis_stage = 'assemble_contigs'
current_analysis_stage_index = analysis_stages.index(current_analysis_stage)

current_analysis_stage_inputs = {
'normalized_reads_fwd': normalize_depth_analysis_summary['outputs']['normalized_reads_fwd'],
'normalized_reads_rev': normalize_depth_analysis_summary['outputs']['normalized_reads_rev'],
}

if not args.skip_depth_normalization:
current_analysis_stage_inputs = {
'reads_fwd': normalize_depth_analysis_summary['outputs']['normalized_reads_fwd'],
'reads_rev': normalize_depth_analysis_summary['outputs']['normalized_reads_rev'],
}
else:
current_analysis_stage_inputs = {
'reads_fwd': os.path.abspath(args.forward_reads),
'reads_rev': os.path.abspath(args.reverse_reads),
}

current_analysis_stage_outdir = os.path.join(args.outdir, 'analysis_by_stage', f'{current_analysis_stage_index:02}_{current_analysis_stage}')
current_analysis_stage_outdir = os.path.abspath(current_analysis_stage_outdir)

Expand Down Expand Up @@ -323,9 +337,19 @@ def main():
current_analysis_stage_inputs = {
'filtered_scaffold_blast_results': blast_scaffolds_analysis_summary['outputs']['filtered_scaffold_blast_results'],
'database': os.path.abspath(args.db),
'normalized_reads_fwd': normalize_depth_analysis_summary['outputs']['normalized_reads_fwd'],
'normalized_reads_rev': normalize_depth_analysis_summary['outputs']['normalized_reads_rev'],
}

if args.skip_depth_normalization:
current_analysis_stage_inputs.update({
'reads_fwd': os.path.abspath(args.forward_reads),
'reads_rev': os.path.abspath(args.reverse_reads),
})
else:
current_analysis_stage_inputs.update({
'reads_fwd': normalize_depth_analysis_summary['outputs']['normalized_reads_fwd'],
'reads_rev': normalize_depth_analysis_summary['outputs']['normalized_reads_rev'],
})

log.info(f'Beginning analysis stage: {current_analysis_stage}')

map_reads_analysis_summary = analysis.map_reads(
Expand Down

0 comments on commit 7876c50

Please sign in to comment.