Skip to content

Commit

Permalink
[YDF] Support DISCRETIZED_NUMERICAL features for in-memory data
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 690540821
  • Loading branch information
rstz authored and copybara-github committed Oct 28, 2024
1 parent ad6f412 commit 772b008
Show file tree
Hide file tree
Showing 7 changed files with 326 additions and 85 deletions.
1 change: 1 addition & 0 deletions yggdrasil_decision_forests/port/python/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

- Add support for Avro file for path / distributed training with the "avro:"
prefix.
- Add support for discretized numerical features for in-memory datasets.

## HEAD

Expand Down
8 changes: 8 additions & 0 deletions yggdrasil_decision_forests/port/python/ydf/cc/ydf.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,14 @@ class VerticalDataset:
ydf_dtype: Optional[data_spec_pb2.DType],
column_idx: Optional[int],
) -> None: ...
def PopulateColumnDiscretizedNumericalNPFloat32(
self,
name: str,
data: npt.NDArray[np.float32],
ydf_dtype: Optional[data_spec_pb2.DType],
maximum_num_bins: Optional[int],
column_idx: Optional[int],
) -> None: ...
def PopulateColumnBooleanNPBool(
self,
name: str,
Expand Down
132 changes: 123 additions & 9 deletions yggdrasil_decision_forests/port/python/ydf/dataset/dataset.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include <string>
#include <string_view>
#include <unordered_map>
#include <utility>
#include <vector>

#include "absl/container/flat_hash_map.h"
Expand Down Expand Up @@ -64,6 +65,8 @@ namespace {

using NumericalColumn =
::yggdrasil_decision_forests::dataset::VerticalDataset::NumericalColumn;
using DiscretizedNumericalColumn = ::yggdrasil_decision_forests::dataset::
VerticalDataset::DiscretizedNumericalColumn;
using BooleanColumn =
::yggdrasil_decision_forests::dataset::VerticalDataset::BooleanColumn;
using CategoricalColumn =
Expand Down Expand Up @@ -105,9 +108,14 @@ absl::Status SetAndCheckNumRowsAndFillMissing(dataset::VerticalDataset& self,
return absl::OkStatus();
}

// Creates a column spec for a numerical column.
absl::StatusOr<dataset::proto::Column> CreateNumericalColumnSpec(
const absl::string_view name, const StridedSpanFloat32 values) {
// Collects statistics about a numerical column to use for the column spec. If
// the `unique_counts` parameter is given, builds a map of unique values of the
// column. The name set in `column` is used in the error messages.
absl::Status CollectNumericalColumnStatistics(
const StridedSpanFloat32 values, dataset::proto::Column* column,
absl::flat_hash_map<float, int>* unique_counts) {
DCHECK(column->has_name());

size_t num_valid_values = 0;
double sum_values = 0;
double sum_square_values = 0;
Expand All @@ -122,7 +130,15 @@ absl::StatusOr<dataset::proto::Column> CreateNumericalColumnSpec(
}
if (std::isinf(value)) {
return absl::InvalidArgumentError(absl::Substitute(
"Found infinite value for numerical feature $0", name));
"Found infinite value for numerical feature $0", column->name()));
}
if (unique_counts != nullptr) {
const auto iter = unique_counts->find(value);
if (iter != unique_counts->end()) {
iter->second++;
} else {
(*unique_counts)[value] = 1;
}
}

sum_values += value;
Expand All @@ -144,12 +160,9 @@ absl::StatusOr<dataset::proto::Column> CreateNumericalColumnSpec(
num_valid_values++;
}

dataset::proto::Column column;
column.set_name(name);
column.set_type(dataset::proto::ColumnType::NUMERICAL);
column.set_count_nas(values.size() - num_valid_values);
column->set_count_nas(values.size() - num_valid_values);

auto* colum_num = column.mutable_numerical();
auto* colum_num = column->mutable_numerical();
if (num_valid_values > 0) {
const double mean = sum_values / num_valid_values;
const double var = sum_square_values / num_valid_values - mean * mean;
Expand All @@ -159,6 +172,53 @@ absl::StatusOr<dataset::proto::Column> CreateNumericalColumnSpec(
colum_num->set_mean(mean);
colum_num->set_standard_deviation(std::sqrt(var));
}
return absl::OkStatus();
}

// Creates a column spec for a numerical column.
absl::StatusOr<dataset::proto::Column> CreateNumericalColumnSpec(
const absl::string_view name, const StridedSpanFloat32 values) {
dataset::proto::Column column;
column.set_name(name);
column.set_type(dataset::proto::ColumnType::NUMERICAL);
RETURN_IF_ERROR(CollectNumericalColumnStatistics(values, &column, nullptr));
return column;
}

// Creates a column spec for a discretized numerical column.
absl::StatusOr<dataset::proto::Column> CreateDiscretizedNumericalColumnSpec(
const absl::string_view name, const StridedSpanFloat32 values,
std::optional<int64_t> maximum_num_bins) {
dataset::proto::Column column;
column.set_name(name);
column.set_type(dataset::proto::ColumnType::DISCRETIZED_NUMERICAL);
column.mutable_discretized_numerical()->set_maximum_num_bins(
maximum_num_bins.value_or(
column.discretized_numerical().maximum_num_bins()));
column.mutable_discretized_numerical()->set_min_obs_in_bins(
column.discretized_numerical().min_obs_in_bins());

absl::flat_hash_map<float, int> unique_counts;
RETURN_IF_ERROR(
CollectNumericalColumnStatistics(values, &column, &unique_counts));
column.mutable_discretized_numerical()->set_original_num_unique_values(
unique_counts.size());

std::vector<std::pair<float, int>> sorted_values_and_counts;
sorted_values_and_counts.reserve(unique_counts.size());
for (const auto& item : unique_counts) {
sorted_values_and_counts.emplace_back(item);
}
std::sort(sorted_values_and_counts.begin(), sorted_values_and_counts.end());

ASSIGN_OR_RETURN(const auto bounds,
dataset::GenDiscretizedBoundaries(
sorted_values_and_counts,
column.discretized_numerical().maximum_num_bins(),
column.discretized_numerical().min_obs_in_bins(),
{0.f, static_cast<float>(column.numerical().mean())}));
*column.mutable_discretized_numerical()->mutable_boundaries() = {
bounds.begin(), bounds.end()};
return column;
}

Expand Down Expand Up @@ -206,6 +266,56 @@ absl::Status PopulateColumnNumericalNPFloat32(
return absl::OkStatus();
}

// Append contents of `data` to a discretized numerical column. If no
// `column_idx` is given, a new column is created.
//
// Note that this function only creates the columns and copies the data, it does
// not set `num_rows` on the dataset. Before using the dataset, `num_rows` has
// to be set (e.g. using SetAndCheckNumRows).
absl::Status PopulateColumnDiscretizedNumericalNPFloat32(
dataset::VerticalDataset& self, const std::string& name,
py::array_t<float>& data, std::optional<dataset::proto::DType> ydf_dtype,
std::optional<int64_t> maximum_num_bins, std::optional<int> column_idx) {
StridedSpanFloat32 src_values(data);

DiscretizedNumericalColumn* column;
dataset::proto::Column column_spec;
if (!column_idx.has_value()) {
// Create column spec
ASSIGN_OR_RETURN(column_spec, CreateDiscretizedNumericalColumnSpec(
name, src_values, maximum_num_bins));

if (ydf_dtype.has_value()) {
column_spec.set_dtype(*ydf_dtype);
}

ASSIGN_OR_RETURN(auto* abstract_column, self.AddColumn(column_spec));
// Import column data
ASSIGN_OR_RETURN(
column,
abstract_column->MutableCastWithStatus<DiscretizedNumericalColumn>());
} else {
// Note that when populating an existing vertical dataset column, we don't
// compute / update the dataspec.
ASSIGN_OR_RETURN(
column,
self.MutableColumnWithCastWithStatus<DiscretizedNumericalColumn>(
column_idx.value()));
column_spec = self.data_spec().columns(column_idx.value());
}

std::vector<dataset::DiscretizedNumericalIndex>& dst_values =
*column->mutable_values();
const size_t offset = dst_values.size();
dst_values.resize(offset + src_values.size());
for (size_t i = 0; i < src_values.size(); i++) {
dst_values[i + offset] =
dataset::NumericalToDiscretizedNumerical(column_spec, src_values[i]);
}

return absl::OkStatus();
}

// Creates a column spec for a boolean column.
absl::StatusOr<dataset::proto::Column> CreateBooleanColumnSpec(
const std::string& name, const StridedSpan<bool> values) {
Expand Down Expand Up @@ -879,6 +989,10 @@ void init_dataset(py::module_& m) {
WithStatus(PopulateColumnNumericalNPFloat32), py::arg("name"),
py::arg("data").noconvert(), py::arg("ydf_dtype"),
py::arg("column_idx") = std::nullopt)
.def("PopulateColumnDiscretizedNumericalNPFloat32",
WithStatus(PopulateColumnDiscretizedNumericalNPFloat32),
py::arg("name"), py::arg("data").noconvert(), py::arg("ydf_dtype"),
py::arg("maximum_num_bins"), py::arg("column_idx") = std::nullopt)
.def("PopulateColumnBooleanNPBool",
WithStatus(PopulateColumnBooleanNPBool), py::arg("name"),
py::arg("data").noconvert(), py::arg("ydf_dtype"),
Expand Down
62 changes: 49 additions & 13 deletions yggdrasil_decision_forests/port/python/ydf/dataset/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,16 +103,21 @@ def _add_column(

original_column_data = column_data

if column.semantic == dataspec.Semantic.NUMERICAL:
if (
column.semantic == dataspec.Semantic.NUMERICAL
or column.semantic == dataspec.Semantic.DISCRETIZED_NUMERICAL
):
assert column.semantic is not None # Appease pylint.
if not isinstance(column_data, np.ndarray):
column_data = np.array(column_data, np.float32)
ydf_dtype = dataspec.np_dtype_to_ydf_dtype(column_data.dtype)

if column_data.dtype != np.float32:
log.warning(
"Column '%s' with NUMERICAL semantic has dtype %s. Casting value"
"Column '%s' with %s semantic has dtype %s. Casting value"
" to float32.",
column.name,
column.semantic.name,
column_data.dtype.name,
message_id=log.WarningMessage.CAST_NUMERICAL_TO_FLOAT32,
is_strict=True,
Expand All @@ -122,20 +127,38 @@ def _add_column(
column_data = column_data.astype(np.float32)
except ValueError as e:
raise ValueError(
f"Cannot convert NUMERICAL column {column.name!r} of type"
f" {_type(column_data)} and with content={column_data!r} to"
f"Cannot convert {column.semantic.name} column {column.name!r} of"
f" type {_type(column_data)} and with content={column_data!r} to"
" np.float32 values.\nNote: If the column is a label, make sure"
" the training task is compatible. For example, you cannot train"
" a regression model (task=ydf.Task.REGRESSION) on a string"
" column."
) from e

self._dataset.PopulateColumnNumericalNPFloat32(
column.name,
column_data,
ydf_dtype=ydf_dtype,
column_idx=column_idx, # `column_idx` may be None
)
if column.semantic == dataspec.Semantic.NUMERICAL:
self._dataset.PopulateColumnNumericalNPFloat32(
column.name,
column_data,
ydf_dtype=ydf_dtype,
column_idx=column_idx, # `column_idx` may be None
)
elif column.semantic == dataspec.Semantic.DISCRETIZED_NUMERICAL:
if (
column.num_discretized_numerical_bins is None
and inference_args is not None
):
column.num_discretized_numerical_bins = (
inference_args.num_discretized_numerical_bins
)
self._dataset.PopulateColumnDiscretizedNumericalNPFloat32(
column.name,
column_data,
ydf_dtype=ydf_dtype,
maximum_num_bins=column.num_discretized_numerical_bins,
column_idx=column_idx, # `column_idx` may be None
)
else:
raise ValueError("Not reached")
return

elif column.semantic == dataspec.Semantic.BOOLEAN:
Expand Down Expand Up @@ -713,8 +736,14 @@ def dataspec_to_normalized_columns(
column_data = []
else:
column_data = data[column.name]

if column.semantic is None:
infered_semantic = infer_semantic(column.name, column_data)
discretize_numerical = (
inference_args is None
) or inference_args.discretize_numerical_columns
infered_semantic = infer_semantic(
column.name, column_data, discretize_numerical
)
effective_column.semantic = infered_semantic
columns_to_check.append(column_idx)

Expand Down Expand Up @@ -789,7 +818,11 @@ def look_numerical(v: str) -> bool:
return False


def infer_semantic(name: str, data: Any) -> dataspec.Semantic:
def infer_semantic(
name: str,
data: Any,
discretize_numerical: bool,
) -> dataspec.Semantic:
"""Infers the semantic of a column from its data."""

# If a column has no data, we assume it only contains missing values.
Expand All @@ -810,7 +843,10 @@ def infer_semantic(name: str, data: Any) -> dataspec.Semantic:
data.dtype.type in dataspec.NP_SUPPORTED_INT_DTYPE
or data.dtype.type in dataspec.NP_SUPPORTED_FLOAT_DTYPE
):
return dataspec.Semantic.NUMERICAL
if discretize_numerical:
return dataspec.Semantic.DISCRETIZED_NUMERICAL
else:
return dataspec.Semantic.NUMERICAL

if data.dtype.type in [np.bytes_, np.str_]:
return dataspec.Semantic.CATEGORICAL
Expand Down
Loading

0 comments on commit 772b008

Please sign in to comment.