Skip to content

Commit

Permalink
arraytable.py: cleanup.
Browse files Browse the repository at this point in the history
  • Loading branch information
jordibc committed Oct 19, 2023
1 parent d304bf3 commit b06bc6e
Showing 1 changed file with 83 additions and 88 deletions.
171 changes: 83 additions & 88 deletions ete4/coretype/arraytable.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
import numpy as np

from ..parser.text_arraytable import write_arraytable, read_arraytable

__all__ = ["ArrayTable"]


class ArrayTable:
"""This object is thought to work with matrix datasets (like
microarrays). It allows to load the matrix an access easily to row
and column vectors. """
"""Class to work with matrix datasets (like microarrays).
def __repr__(self):
return "ArrayTable (%s)" % hex(self.__hash__())

def __str__(self):
return str(self.matrix)
It allows to load the matrix and access easily row and column vectors.
"""

def __init__(self, matrix_file=None, mtype="float"):
self.colNames = []
Expand All @@ -26,75 +23,84 @@ def __init__(self, matrix_file=None, mtype="float"):
if matrix_file is not None:
read_arraytable(matrix_file, mtype=mtype, arraytable_object=self)

def __repr__(self):
return "ArrayTable (%s)" % hex(self.__hash__())

def __str__(self):
return str(self.matrix)

def get_row_vector(self, rowname):
""" Returns the vector associated to the given row name """
"""Return the vector associated to the given row name."""
return self.rowValues.get(rowname)


def get_column_vector(self, colname):
""" Returns the vector associated to the given column name """
"""Return the vector associated to the given column name."""
return self.colValues.get(colname)

def get_several_row_vectors(self, rownames):
"""Return a list of vectors associated to several row names."""
vectors = [self.rowValues[rname] for rname in rownames]
return np.array(vectors)

def get_several_column_vectors(self, colnames):
""" Returns a list of vectors associated to several column names """
"""Return a list of vectors associated to several column names."""
vectors = [self.colValues[cname] for cname in colnames]
return np.array(vectors)

def get_several_row_vectors(self, rownames):
""" Returns a list vectors associated to several row names """
vectors = [self.rowValues[rname] for rname in rownames]
return np.array(vectors)

def remove_column(self, colname):
"""Removes the given column form the current dataset """
"""Remove the given column form the current dataset."""
col_value = self.colValues.pop(colname, None)
if col_value is not None:
new_indexes = list(range(len(self.colNames)))
index = self.colNames.index(colname)
self.colNames.pop(index)
new_indexes.pop(index)
newmatrix = self.matrix.swapaxes(0,1)
newmatrix = newmatrix[new_indexes].swapaxes(0,1)
self._link_names2matrix(newmatrix)

if col_value is None:
return

new_indexes = list(range(len(self.colNames)))
index = self.colNames.index(colname)

self.colNames.pop(index)
new_indexes.pop(index)

newmatrix = self.matrix.swapaxes(0,1)
newmatrix = newmatrix[new_indexes].swapaxes(0,1)

self._link_names2matrix(newmatrix)

def merge_columns(self, groups, grouping_criterion):
""" Returns a new ArrayTable object in which columns are
merged according to a given criterion.
"""Return a new ArrayTable with merged columns.
'groups' argument must be a dictionary in which keys are the
new column names, and each value is the list of current
column names to be merged.
The columns are merged (grouped) according to the given criterion.
'grouping_criterion' must be 'min', 'max' or 'mean', and
defines how numeric values will be merged.
:param groups: Dictionary in which keys are the new column
names, and each value is the list of current column names
to be merged.
:param grouping_criterion: How to merge numeric values. Can be
'min', 'max' or 'mean'.
Example:
my_groups = {'NewColumn':['column5', 'column6']}
new_Array = Array.merge_columns(my_groups, 'max')
Example::
my_groups = {'NewColumn': ['column5', 'column6']}
new_Array = Array.merge_columns(my_groups, 'max')
"""

if grouping_criterion == "max":
grouping_f = get_max_vector
elif grouping_criterion == "min":
grouping_f = get_min_vector
elif grouping_criterion == "mean":
grouping_f = get_mean_vector
else:
raise ValueError("grouping_criterion not supported. Use max|min|mean ")
groupings = {'max': get_max_vector,
'min': get_min_vector,
'mean': get_mean_vector}
try:
grouping_f = groupings[grouping_criterion]
except KeyError:
raise ValueError(f'grouping_criterion "{grouping_criterion}" not '
'supported. Valid ones: %s' % ' '.join(groupings))

grouped_array = self.__class__()
grouped_matrix = []
colNames = []
alltnames = set([])
for gname,tnames in groups.items():
for gname, tnames in groups.items():
all_vectors=[]
for tn in tnames:
if tn not in self.colValues:
raise ValueError(str(tn)+" column not found.")
raise ValueError(f'column not found: {tn}')
if tn in alltnames:
raise ValueError(str(tn)+" duplicated column name for merging")
raise ValueError(f'duplicated column name for merging: {tn}')
alltnames.add(tn)
vector = self.get_column_vector(tn).astype(float)
all_vectors.append(vector)
Expand All @@ -115,8 +121,7 @@ def merge_columns(self, groups, grouping_criterion):
return grouped_array

def transpose(self):
""" Returns a new ArrayTable in which current matrix is transposed. """

"""Return a new ArrayTable in which current matrix is transposed."""
transposedA = self.__class__()
transposedM = self.matrix.transpose()
transposedA.colNames = list(self.rowNames)
Expand All @@ -131,7 +136,7 @@ def transpose(self):
return transposedA

def _link_names2matrix(self, m):
""" Synchronize curent column and row names to the given matrix"""
"""Synchronize curent column and row names to the given matrix."""
if len(self.rowNames) != m.shape[0]:
raise ValueError("Expecting matrix with %d rows" % m.size[0])

Expand All @@ -141,64 +146,54 @@ def _link_names2matrix(self, m):
self.matrix = m
self.colValues.clear()
self.rowValues.clear()

# link columns names to vectors
i = 0
for colname in self.colNames:
for i, colname in enumerate(self.colNames):
self.colValues[colname] = self.matrix[:,i]
i+=1

# link row names to vectors
i = 0
for rowname in self.rowNames:
for i, rowname in enumerate(self.rowNames):
self.rowValues[rowname] = self.matrix[i,:]
i+=1

def write(self, fname, colnames=None):
write_arraytable(self, fname, colnames=colnames)



def get_centroid_dist(vcenter, vlist, fdist):
d = 0.0
for v in vlist:
d += fdist(v, vcenter)
return 2 * (d / len(vlist))

def get_average_centroid_linkage_dist(vcenter1, vlist1, vcenter2, vlist2, fdist):
d1, d2 = 0.0, 0.0
for v in vlist1:
d1 += fdist(v, vcenter2)
for v in vlist2:
d2 += fdist(v, vcenter1)
return 2 * sum(fdist(v, vcenter) for v in vlist) / len(vlist)


def get_average_centroid_linkage_dist(vcenter1, vlist1,
vcenter2, vlist2, fdist):
d1 = sum(fdist(v, vcenter2) for v in vlist1)
d2 = sum(fdist(v, vcenter1) for v in vlist2)

return (d1 + d2) / (len(vlist1) + len(vlist2))


def safe_mean(values):
""" Returns mean value discarding non finite values """
valid_values = []
for v in values:
if np.isfinite(v):
valid_values.append(v)
"""Return the mean value and std discarding non finite values."""
valid_values = [v for v in values if np.isfinite(v)]
return np.mean(valid_values), np.std(valid_values)


def safe_mean_vector(vectors):
""" Returns mean profile discarding non finite values """
# if only one vector, avg = itself
if len(vectors)==1:
"""Return list of (mean, std) profiles discarding non finite values."""
# If only one vector, avg = itself
if len(vectors) == 1:
return vectors[0], np.zeros(len(vectors[0]))
# Takes the vector length form the first item
length = len(vectors[0])

safe_mean = []
safe_std = []

for pos in range(length):
pos_mean = []
for v in vectors:
if np.isfinite(v[pos]):
pos_mean.append(v[pos])
safe_mean.append(np.mean(pos_mean))
safe_std.append(np.std(pos_mean))
safe_std = []
for i in range(len(vectors[0])): # take vector length form the first item
values = [v[i] for v in vectors if np.isfinite(v[i])]

safe_mean.append(np.mean(values))
safe_std.append(np.std(values))

return safe_mean, safe_std


def get_mean_vector(vlist):
return np.mean(vlist, 0)

Expand Down

0 comments on commit b06bc6e

Please sign in to comment.