Skip to content

Commit

Permalink
Add all and any in DataFrame (#568)
Browse files Browse the repository at this point in the history
This PR adds all and any in DataFrame

Also, this PR fixes `Series.quantile` working with Spark 2.3 by avoiding to use `arrays_zip` expression.
  • Loading branch information
HyukjinKwon authored and ueshin committed Jul 17, 2019
1 parent 4bbe022 commit bf63c7e
Show file tree
Hide file tree
Showing 5 changed files with 197 additions and 23 deletions.
160 changes: 160 additions & 0 deletions databricks/koalas/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -5510,6 +5510,166 @@ def melt(self, id_vars=None, value_vars=None, var_name='variable',

return DataFrame(exploded_df)

# TODO: axis, skipna, and many arguments should be implemented.
def all(self, axis: Union[int, str] = 0) -> bool:
"""
Return whether all elements are True.
Returns True unless there is at least one element within a series that is
False or equivalent (e.g. zero or empty)
Parameters
----------
axis : {0 or 'index'}, default 0
Indicate which axis or axes should be reduced.
* 0 / 'index' : reduce the index, return a Series whose index is the
original column labels.
Examples
--------
Create a dataframe from a dictionary.
>>> df = ks.DataFrame({
... 'col1': [True, True, True],
... 'col2': [True, False, False],
... 'col3': [0, 0, 0],
... 'col4': [1, 2, 3],
... 'col5': [True, True, None],
... 'col6': [True, False, None]},
... columns=['col1', 'col2', 'col3', 'col4', 'col5', 'col6'])
Default behaviour checks if column-wise values all return a boolean.
>>> df.all()
col1 True
col2 False
col3 False
col4 True
col5 True
col6 False
Name: all, dtype: bool
Returns
-------
Series
"""

if axis not in [0, 'index']:
raise ValueError('axis should be either 0 or "index" currently.')

applied = []
data_columns = self._internal.data_columns
for column in data_columns:
col = self[column]._scol
all_col = F.min(F.coalesce(col.cast('boolean'), F.lit(True)))
applied.append(F.when(all_col.isNull(), True).otherwise(all_col))

# TODO: there is a similar logic to transpose in, for instance,
# DataFrame.any, Series.quantile. Maybe we should deduplicate it.
sdf = self._sdf
internal_index_column = "__index_level_0__"
value_column = "value"
cols = []
for data_column, applied_col in zip(data_columns, applied):
cols.append(F.struct(
F.lit(data_column).alias(internal_index_column),
applied_col.alias(value_column)))

sdf = sdf.select(
F.array(*cols).alias("arrays")
).select(F.explode(F.col("arrays")))

sdf = sdf.selectExpr("col.*")

internal = self._internal.copy(
sdf=sdf,
data_columns=[value_column],
index_map=[(internal_index_column, None)])

ser = DataFrame(internal)[value_column].rename("all")
return ser

# TODO: axis, skipna, and many arguments should be implemented.
def any(self, axis: Union[int, str] = 0) -> bool:
"""
Return whether any element is True.
Returns False unless there is at least one element within a series that is
True or equivalent (e.g. non-zero or non-empty).
Parameters
----------
axis : {0 or 'index'}, default 0
Indicate which axis or axes should be reduced.
* 0 / 'index' : reduce the index, return a Series whose index is the
original column labels.
Examples
--------
Create a dataframe from a dictionary.
>>> df = ks.DataFrame({
... 'col1': [False, False, False],
... 'col2': [True, False, False],
... 'col3': [0, 0, 1],
... 'col4': [0, 1, 2],
... 'col5': [False, False, None],
... 'col6': [True, False, None]},
... columns=['col1', 'col2', 'col3', 'col4', 'col5', 'col6'])
Default behaviour checks if column-wise values all return a boolean.
>>> df.any()
col1 False
col2 True
col3 True
col4 True
col5 False
col6 True
Name: any, dtype: bool
Returns
-------
Series
"""

if axis not in [0, 'index']:
raise ValueError('axis should be either 0 or "index" currently.')

applied = []
data_columns = self._internal.data_columns
for column in data_columns:
col = self[column]._scol
all_col = F.max(F.coalesce(col.cast('boolean'), F.lit(False)))
applied.append(F.when(all_col.isNull(), False).otherwise(all_col))

# TODO: there is a similar logic to transpose in, for instance,
# DataFrame.all, Series.quantile. Maybe we should deduplicate it.
sdf = self._sdf
internal_index_column = "__index_level_0__"
value_column = "value"
cols = []
for data_column, applied_col in zip(data_columns, applied):
cols.append(F.struct(
F.lit(data_column).alias(internal_index_column),
applied_col.alias(value_column)))

sdf = sdf.select(
F.array(*cols).alias("arrays")
).select(F.explode(F.col("arrays")))

sdf = sdf.selectExpr("col.*")

internal = self._internal.copy(
sdf=sdf,
data_columns=[value_column],
index_map=[(internal_index_column, None)])

ser = DataFrame(internal)[value_column].rename("any")
return ser

# TODO: add axis, numeric_only, pct, na_option parameter
def rank(self, method='average', ascending=True):
"""
Expand Down
2 changes: 0 additions & 2 deletions databricks/koalas/missing/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ class _MissingPandasLikeDataFrame(object):
agg = unsupported_function('agg')
aggregate = unsupported_function('aggregate')
align = unsupported_function('align')
all = unsupported_function('all')
any = unsupported_function('any')
apply = unsupported_function('apply')
asfreq = unsupported_function('asfreq')
asof = unsupported_function('asof')
Expand Down
55 changes: 34 additions & 21 deletions databricks/koalas/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -2254,7 +2254,7 @@ def quantile(self, q=0.5, accuracy=10000):
Parameters
----------
q : float or array-like, default 0.5 (50% quantile)
0 <= q <= 1, the quantile(s) to compute. If q is array-like, Spark 2.4+ is required.
0 <= q <= 1, the quantile(s) to compute.
accuracy : int, optional
Default accuracy of approximation. Larger value means better accuracy.
The relative error can be deduced by 1.0 / accuracy.
Expand All @@ -2272,9 +2272,9 @@ def quantile(self, q=0.5, accuracy=10000):
>>> s.quantile(.5)
3
>>> s.quantile([.25, .5, .75]) # doctest: +SKIP
>>> s.quantile([.25, .5, .75])
0.25 2
0.50 3
0.5 3
0.75 4
Name: 0, dtype: int64
"""
Expand All @@ -2293,37 +2293,50 @@ def quantile(self, q=0.5, accuracy=10000):
"percentiles should all be in the interval [0, 1].")

if isinstance(q, list):
quantiles = q
# TODO: avoid to use dataframe. After this, anchor will be lost.

# First calculate the percentiles and map it to given `q`.
# First calculate the percentiles and map it to each `quantiles`
# by creating each entry as a struct. So, it becomes an array of
# structs as below:
#
# +-----------+
# | col |
# +-----------+
# | [0.25, 2] |
# | [0.50, 3] |
# | [0.75, 4] |
# +-----------+
# +--------------------------------+
# | arrays |
# +--------------------------------+
# |[[0.25, 2], [0.5, 3], [0.75, 4]]|
# +--------------------------------+
sdf = self._kdf._sdf
args = ", ".join(map(str, q))
args = ", ".join(map(str, quantiles))
percentile_col = F.expr(
"approx_percentile(`%s`, array(%s), %s)" % (self.name, args, accuracy))
sdf = sdf.select(
percentile_col.alias("arrays")
).select(F.explode(F.arrays_zip(F.expr("array(%s)" % args), F.col("arrays"))))
sdf = sdf.select(percentile_col.alias("percentiles"))

# And then, explode it and manually set the index.
internal_index_column = "__index_level_0__"
data_column = "value"
sdf = sdf.selectExpr("col.*").selectExpr(
"`0` as %s" % internal_index_column, "arrays as %s" % data_column)
value_column = "value"
cols = []
for i, quantile in enumerate(quantiles):
cols.append(F.struct(
F.lit("%s" % quantile).alias(internal_index_column),
F.expr("percentiles[%s]" % i).alias(value_column)))
sdf = sdf.select(F.array(*cols).alias("arrays"))

# And then, explode it and manually set the index.
#
# +-----------------+-----+
# |__index_level_0__|value|
# +-----------------+-----+
# | 0.25 | 2|
# | 0.5 | 3|
# | 0.75 | 4|
# +-----------------+-----+
sdf = sdf.select(F.explode(F.col("arrays"))).selectExpr("col.*")

internal = self._kdf._internal.copy(
sdf=sdf,
data_columns=list(data_column),
data_columns=[value_column],
index_map=[(internal_index_column, None)])

ser = DataFrame(internal)[data_column].rename(self.name)
ser = DataFrame(internal)[value_column].rename(self.name)
return ser
else:
return self._reduce_for_stat_function(
Expand Down
2 changes: 2 additions & 0 deletions docs/source/reference/frame.rst
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ Computations / Descriptive Stats
:toctree: api/

DataFrame.abs
DataFrame.all
DataFrame.any
DataFrame.clip
DataFrame.corr
DataFrame.count
Expand Down
1 change: 1 addition & 0 deletions docs/source/reference/series.rst
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ Computations / Descriptive Stats
Series.nlargest
Series.nsmallest
Series.nunique
Series.quantile
Series.rank
Series.skew
Series.std
Expand Down

0 comments on commit bf63c7e

Please sign in to comment.