-
Notifications
You must be signed in to change notification settings - Fork 42
/
fasta_n50_one_line.py
executable file
·110 lines (88 loc) · 2.7 KB
/
fasta_n50_one_line.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
#!/usr/bin/env python
"""Calculate N50 from an assembled genome fasta file
Usage:
python fasta_n50.py genome_file [min_length]
"""
# Importing modules
from signal import signal, SIGPIPE, SIG_DFL
from collections import defaultdict
import gzip
import sys
# Defining classes
class Fasta(object):
"""Fasta object with name and sequence
"""
def __init__(self, name, sequence):
self.name = name
self.sequence = sequence
def write_to_file(self, handle):
handle.write(">" + self.name + "\n")
handle.write(self.sequence + "\n")
# Defining functions
def myopen(_file, mode="rt"):
if _file.endswith(".gz"):
return gzip.open(_file, mode=mode)
else:
return open(_file, mode=mode)
def fasta_iterator(input_file):
"""Takes a fasta file input_file and returns a fasta iterator
"""
with myopen(input_file) as f:
sequence = []
name = ""
begun = False
for line in f:
line = line.strip()
if line.startswith(">"):
if begun:
yield Fasta(name, "".join(sequence))
name = line[1:]
sequence = ""
begun = True
else:
sequence += line
if name != "":
yield Fasta(name, "".join(sequence))
if __name__ == '__main__':
# Prevent broken pipe error
signal(SIGPIPE, SIG_DFL)
# Parse user input
try:
genome_file = sys.argv[1]
except:
print(__doc__)
sys.exit(1)
try:
min_length = int(sys.argv[2])
except:
min_length = 1
# Cound kmers
sequence_lengths = []
num_seq = 0
for seq in fasta_iterator(genome_file):
length = len(seq.sequence)
if length >= min_length:
num_seq += 1
sequence_lengths.append(len(seq.sequence))
sequence_lengths = sorted(sequence_lengths, reverse=True)
total_length = sum(sequence_lengths)
half_length = float(total_length) / 2.0
#print(genome_file)
#print(" " + str(total_length) + " bp in " + str(num_seq) +
# " sequences of " + str(min_length) + "+ bp")
cumulative_length = 0
cumulative_seq = 0
for seq_len in sequence_lengths:
cumulative_length += seq_len
cumulative_seq += 1
if cumulative_length >= half_length:
#print(" L50: " + str(cumulative_seq) + "; N50: " + str(seq_len))
#print()
break
out_name = genome_file
out_len = str(total_length)
out_nseq = str(num_seq)
out_l50 = str(cumulative_seq)
out_n50 = str(seq_len)
print("#Name\tSize\tNSeq\tL50\tN50")
print("\t".join([out_name, out_len, out_nseq, out_len, out_n50]))