diff --git a/src/probabilistic_model/learning/nyga_distribution.py b/src/probabilistic_model/learning/nyga_distribution.py index a6ce200..b7034f1 100644 --- a/src/probabilistic_model/learning/nyga_distribution.py +++ b/src/probabilistic_model/learning/nyga_distribution.py @@ -1,3 +1,4 @@ +import collections from typing import Optional, List import portion @@ -20,6 +21,11 @@ class NygaDistribution(ContinuousDistribution, DeterministicSumUnit): The relative, minimal likelihood improvement needed to create a new quantile. """ + min_samples_per_quantile: int = 1 + """ + The minimal number of samples per quantile. + """ + def __init__(self, variable: Continuous, min_likelihood_improvement: Optional[float] = None, parent: 'Unit' = None): super().__init__(variable, None) DeterministicSumUnit.__init__(self, self.variables, [], parent) @@ -35,27 +41,66 @@ def fit(self, data: List[float]) -> Self: """ # sort the data sorted_data = list(sorted(data)) - ... + + minimal_distribution = UniformDistribution(self.variable, portion.closed(sorted_data[0], sorted_data[-1])) + minimal_average_likelihood = (sum([minimal_distribution.likelihood([value]) for value in sorted_data]) + / len(sorted_data)) + + compute_most_likely_split_parameters = collections.deque() + compute_most_likely_split_parameters.append((sorted_data, 0, len(data))) + + while len(compute_most_likely_split_parameters) > 0: + + parameters = compute_most_likely_split_parameters.pop() + + # calculate the best possible split + new_maximum_likelihood, best_sum_node, split_index = self.compute_most_likely_split(sorted_data) + + # if no further splits could be made + if split_index is None: + break + + # if the improvement is not large enough + if new_maximum_likelihood <= minimal_average_likelihood * self.min_likelihood_improvement: + break + + return self def compute_most_likely_split(self, data: List[float]): maximum_likelihood = 0 best_sum_node = None + index = None + for index in range(self.min_samples_per_quantile, len(data) - self.min_samples_per_quantile): - for index in range(1, len(data) - 1): distribution = self._create_deterministic_uniform_mixture_from_datasets(data[:index], data[index:]) average_likelihood = sum([distribution.likelihood([value]) for value in data]) / len(data) if average_likelihood > maximum_likelihood: maximum_likelihood = average_likelihood best_sum_node = distribution - return maximum_likelihood, best_sum_node + return maximum_likelihood, best_sum_node, index def _create_deterministic_uniform_mixture_from_datasets(self, left_data: List[float], right_data: List[float]) -> DeterministicSumUnit: + """ + Create a deterministic uniform mixture from two datasets. + The left dataset is included in the left uniform distribution up to the middle point between the last + point in the left dataset and the first point in the right dataset. + The right dataset is included in the right uniform distribution from the middle point. + The weights of the mixture correspond to the relative size of the datasets. + + :param left_data: The data for the left distribution. + :param right_data: The data for the right distribution. + :return: A deterministic uniform mixture of the two datasets. + """ + + connecting_point = (left_data[-1] + right_data[0]) / 2 # creat uniform distribution from the left including to the right excluding - left_uniform_distribution = UniformDistribution(self.variable, portion.closedopen(left_data[0], right_data[0])) - right_uniform_distribution = UniformDistribution(self.variable, portion.closed(right_data[0], right_data[-1])) + left_uniform_distribution = UniformDistribution(self.variable, + portion.closedopen(left_data[0], connecting_point)) + right_uniform_distribution = UniformDistribution(self.variable, + portion.closed(connecting_point, right_data[-1])) datapoints_total = len(left_data) + len(right_data) diff --git a/src/probabilistic_model/probabilistic_circuit/distributions.py b/src/probabilistic_model/probabilistic_circuit/distributions.py index 1d0d71b..067f41a 100644 --- a/src/probabilistic_model/probabilistic_circuit/distributions.py +++ b/src/probabilistic_model/probabilistic_circuit/distributions.py @@ -1,5 +1,4 @@ import copy -import json import random from typing import Iterable, Tuple, Union, List, Optional, Any, Dict @@ -90,8 +89,8 @@ def cdf(self, value: Any): """ return self._cdf(self.variable.encode(value)) - def conditional_from_singleton(self, singleton: portion.Interval) \ - -> Tuple[Optional['DiracDeltaDistribution'], float]: + def conditional_from_singleton(self, singleton: portion.Interval) -> Tuple[ + Optional['DiracDeltaDistribution'], float]: """ Create a dirac impulse from a singleton interval. @@ -232,8 +231,7 @@ def to_json(self) -> Dict[str, Any]: return {**super().to_json(), "weights": self.weights} @classmethod - def from_json_with_variables_and_children(cls, data: Dict[str, Any], - variables: List[Variable], + def from_json_with_variables_and_children(cls, data: Dict[str, Any], variables: List[Variable], children: List['Unit']) -> Self: variable = random_events.variables.Variable.from_json(data["variable"]) return cls(variable, data["weights"]) @@ -285,26 +283,26 @@ class UniformDistribution(ContinuousDistribution): Class for uniform distributions over the half-open interval [lower, upper). """ - lower: float + interval: portion.Interval """ - The included lower bound of the interval. + The interval that the Uniform distribution is defined over. """ - upper: float - """ - The excluded upper bound of the interval. - """ - - def __init__(self, variable: Continuous, lower: float, upper: float, parent=None): + def __init__(self, variable: Continuous, interval: portion.Interval, parent=None): super().__init__(variable, parent) - if lower >= upper: - raise ValueError("upper has to be greater than lower. lower: {}; upper: {}") - self.lower = lower - self.upper = upper + self.interval = interval @property def domain(self) -> Event: - return Event({self.variable: portion.closedopen(self.lower, self.upper)}) + return Event({self.variable: self.interval}) + + @property + def lower(self) -> float: + return self.interval.lower + + @property + def upper(self) -> float: + return self.interval.upper def pdf_value(self) -> float: """ @@ -313,7 +311,7 @@ def pdf_value(self) -> float: return 1 / (self.upper - self.lower) def _pdf(self, value: float) -> float: - if value in self.interval: + if portion.singleton(value) in self.interval: return self.pdf_value() else: return 0 @@ -351,8 +349,9 @@ def _mode(self): def sample(self, amount: int) -> List[List[float]]: return [[random.uniform(self.lower, self.upper)] for _ in range(amount)] - def conditional_from_interval(self, interval: portion.Interval) -> Tuple[ - Optional[Union[DeterministicSumUnit, Self]], float]: + def conditional_from_interval(self, interval: portion.Interval) \ + -> Tuple[Optional[Union[DeterministicSumUnit, Self]], float]: + # calculate the probability of the interval probability = self._probability(EncodedEvent({self.variable: interval})) @@ -361,8 +360,8 @@ def conditional_from_interval(self, interval: portion.Interval) -> Tuple[ return None, 0 # else, form the intersection of the interval and the domain - intersection = self.domain[self.variable] & interval - resulting_distribution = UniformDistribution(self.variable, intersection.lower, intersection.upper) + intersection = self.interval & interval + resulting_distribution = UniformDistribution(self.variable, intersection) return resulting_distribution, probability def moment(self, order: OrderType, center: CenterType) -> MomentType: @@ -387,8 +386,7 @@ def evaluate_integral_at(x) -> float: def __eq__(self, other): return (isinstance(other, - UniformDistribution) and self.lower == other.lower and self.upper == other.upper and super().__eq__( - other)) + UniformDistribution) and self.interval == other.interval and super().__eq__(other)) def __repr__(self): return f"U{self.interval}" @@ -397,14 +395,13 @@ def __copy__(self): return self.__class__(self.variable, self.interval) def to_json(self) -> Dict[str, Any]: - return {**super().to_json(), "lower": self.lower, "upper": self.upper} + return {**super().to_json(), "interval": portion.to_data(self.interval)} @classmethod - def from_json_with_variables_and_children(cls, data: Dict[str, Any], - variables: List[Variable], + def from_json_with_variables_and_children(cls, data: Dict[str, Any], variables: List[Variable], children: List['Unit']) -> Self: variable = random_events.variables.Variable.from_json(data["variable"]) - return cls(variable, data["lower"], data["upper"]) + return cls(variable, portion.from_data(data["interval"])) class DiracDeltaDistribution(ContinuousDistribution): @@ -486,8 +483,7 @@ def to_json(self) -> Dict[str, Any]: return {**super().to_json(), "location": self.location, "density_cap": self.density_cap} @classmethod - def from_json_with_variables_and_children(cls, data: Dict[str, Any], - variables: List[Variable], + def from_json_with_variables_and_children(cls, data: Dict[str, Any], variables: List[Variable], children: List['Unit']) -> Self: variable = random_events.variables.Variable.from_json(data["variable"]) - return cls(variable, data["location"], data["density_cap"]) \ No newline at end of file + return cls(variable, data["location"], data["density_cap"]) diff --git a/test/test_distributions.py b/test/test_distributions.py index 3318fc7..5659f35 100644 --- a/test/test_distributions.py +++ b/test/test_distributions.py @@ -1,7 +1,7 @@ import unittest from probabilistic_model.probabilistic_circuit.distributions import UniformDistribution, SymbolicDistribution, \ IntegerDistribution, DiracDeltaDistribution -from probabilistic_model.probabilistic_circuit.units import DeterministicSumUnit +from probabilistic_model.probabilistic_circuit.units import DeterministicSumUnit, Unit from random_events.events import Event, VariableMap from random_events.variables import Continuous, Symbolic, Integer import portion @@ -66,8 +66,8 @@ def test_conditional_complex_intersection(self): self.assertEqual(probability, 0.75) self.assertEqual(len(conditional.children), 2) self.assertEqual(conditional.weights, [2 / 3, 1 / 3]) - self.assertEqual(conditional.children[0].domain[conditional.variables[0]], portion.closedopen(0, 1)) - self.assertEqual(conditional.children[1].domain[conditional.variables[0]], portion.closedopen(1.5, 2)) + self.assertEqual(conditional.children[0].interval, portion.closed(0, 1)) + self.assertEqual(conditional.children[1].interval, portion.closedopen(1.5, 2)) def test_conditional_triple_complex_intersection(self): event = Event( @@ -78,9 +78,9 @@ def test_conditional_triple_complex_intersection(self): self.assertEqual(probability, 0.5) self.assertEqual(len(conditional.children), 3) self.assertEqual(conditional.weights, [1 / 4, 1 / 4, 1 / 2]) - self.assertEqual(conditional.children[0].domain[conditional.variables[0]], portion.closedopen(0, 0.25)) - self.assertEqual(conditional.children[1].domain[conditional.variables[0]], portion.closedopen(0.75, 1)) - self.assertEqual(conditional.children[2].domain[conditional.variables[0]], portion.closedopen(1.5, 2)) + self.assertEqual(conditional.children[0].interval, portion.closed(0, 0.25)) + self.assertEqual(conditional.children[1].interval, portion.closed(0.75, 1)) + self.assertEqual(conditional.children[2].interval, portion.closedopen(1.5, 2)) def test_conditional_mode(self): event = Event({ @@ -128,8 +128,9 @@ def test_serialization(self): serialization = self.distribution.to_json() self.assertEqual(serialization["type"], "probabilistic_model.probabilistic_circuit.distributions.UniformDistribution") - self.assertEqual(serialization["lower"], 0) - self.assertEqual(serialization["upper"], 2) + self.assertEqual(serialization["interval"], [(True, 0, 2, False)]) + deserialized = Unit.from_json(serialization) + self.assertIsInstance(deserialized, UniformDistribution) class SymbolicDistributionTestCase(unittest.TestCase): diff --git a/test/test_nyga_distribution.py b/test/test_nyga_distribution.py index 0ff1ab1..8039ee7 100644 --- a/test/test_nyga_distribution.py +++ b/test/test_nyga_distribution.py @@ -31,15 +31,15 @@ def test_create_deterministic_uniform_mixture_from_datasets(self): right_dataset = [4, 7, 9] distribution = NygaDistribution(self.x) dsu = distribution._create_deterministic_uniform_mixture_from_datasets(left_dataset, right_dataset) - self.assertEqual(dsu.children[0], UniformDistribution(self.x, portion.closedopen(1, 4))) - self.assertEqual(dsu.children[1], UniformDistribution(self.x, portion.closed(4, 9))) + self.assertEqual(dsu.children[0], UniformDistribution(self.x, portion.closedopen(1, 3.5))) + self.assertEqual(dsu.children[1], UniformDistribution(self.x, portion.closed(3.5, 9))) + self.assertEqual(dsu.weights, [3/6, 3/6]) def test_compute_best_split(self): dataset = [1, 2, 3, 4, 7, 9] distribution = NygaDistribution(self.x) - maximum_likelihood, best_sum_node = distribution.compute_most_likely_split(dataset) - print(best_sum_node) - print(maximum_likelihood) + maximum_likelihood, best_sum_node, split_index = distribution.compute_most_likely_split(dataset) + if __name__ == '__main__': unittest.main() diff --git a/test/test_probabilistic_circuit.py b/test/test_probabilistic_circuit.py index 022be1e..1ec8247 100644 --- a/test/test_probabilistic_circuit.py +++ b/test/test_probabilistic_circuit.py @@ -169,7 +169,7 @@ def test_equality(self): model_2 = self.model.__copy__() self.assertEqual(self.model, model_2) real2 = Continuous("real2") - model_2 *= UniformDistribution(real2, 0, 1) + model_2 *= UniformDistribution(real2, portion.closedopen(0, 1)) self.assertNotEqual(self.model, model_2) def test_to_json(self):