Skip to content

Commit

Permalink
Started to work on abstract implementation of probabilistic circuits.
Browse files Browse the repository at this point in the history
  • Loading branch information
tomsch420 committed Nov 6, 2023
1 parent 2b18d3c commit c7b482d
Show file tree
Hide file tree
Showing 4 changed files with 393 additions and 1 deletion.
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
random-events>=1.1.3
random-events>=1.1.4
anytree>=2.9.0
Empty file.
212 changes: 212 additions & 0 deletions src/probabilistic_model/probabilistic_circuit/units.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
import itertools
import random
from typing import Iterable, Tuple, List

from anytree import NodeMixin
from random_events.events import EncodedEvent
from random_events.variables import Variable

from probabilistic_model.probabilistic_model import ProbabilisticModel


class Unit(ProbabilisticModel, NodeMixin):
"""
Abstract class for nodes used in a probabilistic circuit
"""

def __init__(self, variables: Iterable[Variable], parent: 'Unit' = None):
self.parent = parent
super().__init__(variables)
NodeMixin.__init__(self)

def variable_indices_of_child(self, child: 'Unit') -> List[int]:
"""
Get the list of the variables' indices in self that are also in child.
:param child: The child to check for.
:return: The indices
"""
return list(index for index, variable in enumerate(self.variables) if variable in child.variables)

def __add__(self, other) -> 'SumUnit':
if not isinstance(other, Unit):
raise ValueError(f"Cannot add a Probabilistic Circuit with {type(other)}.")

joined_variables = set(self.variables).union(other.variables)
result = SumUnit(variables=sorted(joined_variables), weights=[.5, .5])
result.children = [self, other]
return result

def __mul__(self, other) -> 'ProductUnit':
if not isinstance(other, Unit):
raise ValueError(f"Cannot add a Probabilistic Circuit with {type(other)}.")

joined_variables = set(self.variables).union(other.variables)

# check if product is decomposable
if set(self.variables).intersection(other.variables) == set():
result = DecomposableProductUnit(variables=sorted(joined_variables))
else:
result = ProductUnit(variables=sorted(joined_variables))
result.children = [self, other]
return result


class SumUnit(Unit):
"""
Sum node used in a probabilistic circuit
"""

weights: Iterable
"""The weights of the convex sum unit."""

def __init__(self, variables: Iterable[Variable], weights: Iterable, parent: 'Unit' = None):
super().__init__(variables, parent)
self.weights = weights

def _likelihood(self, event: Iterable) -> float:
return sum([weight * child._likelihood(event) for weight, child in zip(self.weights, self.children)])

def _probability(self, event: EncodedEvent) -> float:
return sum([weight * child._probability(event) for weight, child in zip(self.weights, self.children)])

def sample(self, amount: int) -> Iterable:
"""
Sample from the sum node using the latent variable interpretation.
"""

# sample the latent variable
states = random.choices(list(range(len(self.children))), weights=self.weights, k=amount)

# sample from the children
result = []
for index, child in self.children:
result.extend(child.sample(states.count(index)))
return result


class DeterministicSumUnit(SumUnit):
"""
Deterministic sum node used in a probabilistic circuit
"""

def merge_modes_if_one_dimensional(self, modes: List[EncodedEvent]) -> List[EncodedEvent]:
"""
Merge the modes in `modes` to one mode if the model is one dimensional.
:param modes: The modes to merge.
:return: The (possibly) merged modes.
"""
if len(self.variables) > 1:
return modes

# merge modes
mode = modes[0]

for mode_ in modes[1:]:
mode = mode | mode_

return [mode]

def _mode(self) -> Tuple[Iterable[EncodedEvent], float]:
"""
Calculate the mode of the model.
As there may exist multiple modes, this method returns an Iterable of modes and their likelihood.
:return: The internal representation of the mode and the likelihood.
"""
modes = []
likelihoods = []

# gather all modes from the children
for weight, child in zip(self.weights, self.children):
mode, likelihood = child._mode()
modes.append(mode)
likelihoods.append(weight * likelihood)

# get the most likely result
maximum_likelihood = max(likelihoods)

result = []

# gather all results that are maximum likely
for mode, likelihood in zip(modes, likelihoods):
if likelihood == maximum_likelihood:
result.extend(mode)

return self.merge_modes_if_one_dimensional(result), maximum_likelihood

@staticmethod
def from_sum_unit(unit: SumUnit) -> 'DeterministicSumUnit':
"""
Downcast a sum unit to a deterministic sum unit.
:param unit: The sum unit to downcast.
"""
result = DeterministicSumUnit(variables=unit.variables, weights=unit.weights)
result.children = unit.children
return result


class ProductUnit(Unit):
"""
Product node used in a probabilistic circuit
"""


class DecomposableProductUnit(ProductUnit):
"""
Decomposable product node used in a probabilistic circuit
"""

def _likelihood(self, event: Iterable) -> float:
result = 1.

for child in self.children:
indices = self.variable_indices_of_child(child)

partial_event = [event[index] for index in indices]

result = result * child._likelihood(partial_event)

return result

def _probability(self, event: EncodedEvent) -> float:
result = 1.

for child in self.children:
# construct partial event for child
result = result * child._probability(EncodedEvent({variable: event[variable] for variable in
self.variables}))

return result

def _mode(self) -> Tuple[Iterable[EncodedEvent], float]:
"""
Calculate the mode of the model.
As there may exist multiple modes, this method returns an Iterable of modes and their likelihood.
:return: The internal representation of the mode and the likelihood.
"""
modes = []
resulting_likelihood = 1.

# gather all modes from the children
for child in self.children:
mode, likelihood = child._mode()
modes.append(mode)
resulting_likelihood *= likelihood

result = []

# perform the cartesian product of all modes
for mode_combination in itertools.product(*modes):

# form the intersection of the modes inside one cartesian product mode
mode = mode_combination[0]
for mode_ in mode_combination[1:]:
mode = mode | mode_

result.append(mode)

return result, resulting_likelihood
179 changes: 179 additions & 0 deletions test/test_units.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
import unittest
from typing import Iterable, Tuple

import portion
from anytree import PreOrderIter
from random_events.events import EncodedEvent, Event
from random_events.variables import Symbolic, Integer, Continuous, Variable

from probabilistic_model.probabilistic_circuit.units import Unit, DeterministicSumUnit


class DummyDistribution(Unit):

def __init__(self, variable: Variable):
super().__init__([variable])

def _likelihood(self, event: Iterable) -> float:
return 2


class DummySymbolicDistribution(Unit):

def __init__(self):
super().__init__((Symbolic('symbol', ('a', 'b', 'c')),))

def _likelihood(self, event: Iterable) -> float:
return 0.5

def _mode(self) -> Tuple[Iterable[EncodedEvent], float]:
mode = EncodedEvent({self.variables[0]: 0})
return [mode], 0.5


class DummyRealDistribution(Unit):

def __init__(self):
super().__init__((Continuous("real"),))

def _likelihood(self, event: Iterable) -> float:
return 2.

def _mode(self) -> Tuple[Iterable[EncodedEvent], float]:
mode = EncodedEvent({self.variables[0]: portion.open(1., 2.)})
return [mode], 2.


class DummyIntegerDistribution(Unit):

def __init__(self):
super().__init__((Integer('integer', (1, 2, 4)),))

def _likelihood(self, event: Iterable) -> float:
return 3.

def _mode(self) -> Tuple[Iterable[EncodedEvent], float]:
mode = EncodedEvent({self.variables[0]: 2})
return [mode], 3


class UnitCreationTestCase(unittest.TestCase):
symbol = Symbolic('symbol', ('a', 'b', 'c'))
integer = Integer('integer', (1, 2, 4))
real = Continuous("real")
variables = (integer, real, symbol)

def test_node_creation(self):
unit = Unit(self.variables)
self.assertEqual(unit.variables, self.variables)
self.assertEqual(unit.parent, None)
self.assertEqual(unit.children, ())

def test_node_creation_with_parent(self):
node = Unit(self.variables)
child = Unit(self.variables, node)
self.assertEqual(child.parent, node)
self.assertEqual(node.children, (child,))

def test_unit_creation_by_summation(self):
unit = DummyDistribution(self.symbol) * DummyDistribution(self.real) * DummyDistribution(self.integer)
self.assertEqual(unit.variables, self.variables)
self.assertEqual(unit.likelihood([1, 2, "a"]), 2 ** 3)
leaves = list(PreOrderIter(unit, filter_=lambda node: node.is_leaf))
self.assertEqual(len(leaves), 3)


class UnitInferenceTestCase(unittest.TestCase):

def test_mode_real_distribution_unit(self):
distribution = DummyRealDistribution()
mode, likelihood = distribution.mode()
self.assertEqual(likelihood, 2.)
self.assertEqual(len(mode), 1)
self.assertEqual(mode[0], Event({distribution.variables[0]: portion.open(1., 2.)}))

def test_mode_integer_distribution_unit(self):
distribution = DummyIntegerDistribution()
mode, likelihood = distribution.mode()
self.assertEqual(likelihood, 3.)
self.assertEqual(len(mode), 1)
self.assertEqual(mode[0], Event({distribution.variables[0]: 4}))

def test_mode_symbolic_distribution_unit(self):
distribution = DummySymbolicDistribution()
mode, likelihood = distribution.mode()
self.assertEqual(likelihood, 0.5)
self.assertEqual(len(mode), 1)
self.assertEqual(mode[0], Event({distribution.variables[0]: "a"}))

def test_likelihood_of_product_unit(self):
distribution = DummyRealDistribution() * DummySymbolicDistribution()
self.assertEqual(distribution.likelihood([1., "a"]), 1.)
distribution *= DummyIntegerDistribution()
self.assertEqual(distribution.likelihood([1, 1., "a"]), 3.)

def test_mode_of_product_unit(self):
distribution = DummyRealDistribution() * DummySymbolicDistribution() * DummyIntegerDistribution()
mode, likelihood = distribution.mode()
self.assertEqual(likelihood, 3.)
self.assertEqual(len(mode), 1)
self.assertEqual(mode[0], Event({distribution.variables[0]: 4,
distribution.variables[1]: portion.open(1., 2.),
distribution.variables[2]: "a"}))

def test_likelihood_of_continuous_sum_unit(self):
distribution = DummyRealDistribution() + DummyRealDistribution()
self.assertEqual(distribution.likelihood([1.]), 2.)

def test_mode_of_continuous_deterministic_sum_unit(self):
distribution = DummyRealDistribution() + DummyRealDistribution()
distribution = DeterministicSumUnit.from_sum_unit(distribution)
mode, likelihood = distribution.mode()
self.assertEqual(likelihood, 1.)
self.assertEqual(len(mode), 1)
self.assertEqual(mode[0], Event({distribution.variables[0]: portion.open(1., 2.)}))

def test_likelihood_of_integer_sum_unit(self):
distribution = DummyIntegerDistribution() + DummyIntegerDistribution()
self.assertEqual(distribution.likelihood([1]), 3.)

def test_mode_of_integer_deterministic_sum_unit(self):
distribution = DummyIntegerDistribution() + DummyIntegerDistribution()
distribution = DeterministicSumUnit.from_sum_unit(distribution)
mode, likelihood = distribution.mode()
self.assertEqual(likelihood, 1.5)
self.assertEqual(len(mode), 1)
self.assertEqual(mode[0], Event({distribution.variables[0]: 4}))

def test_likelihood_of_symbolic_sum_unit(self):
distribution = DummySymbolicDistribution() + DummySymbolicDistribution()
self.assertEqual(distribution.likelihood(["a"]), 0.5)

def test_mode_of_symbolic_deterministic_sum_unit(self):
distribution = DummySymbolicDistribution() + DummySymbolicDistribution()
distribution = DeterministicSumUnit.from_sum_unit(distribution)
mode, likelihood = distribution.mode()
self.assertEqual(likelihood, 0.25)
self.assertEqual(len(mode), 1)
self.assertEqual(mode[0], Event({distribution.variables[0]: "a"}))

def test_likelihood_of_mixed_sum_unit(self):
distribution = ((DummySymbolicDistribution() * DummyRealDistribution() * DummyIntegerDistribution()) +
(DummySymbolicDistribution() * DummyRealDistribution() * DummyIntegerDistribution()))
self.assertEqual(distribution.likelihood([1, 1., "a"]), 3.)

def test_mode_of_mixed_deterministic_sum_unit(self):
distribution = ((DummySymbolicDistribution() * DummyRealDistribution() * DummyIntegerDistribution()) +
(DummySymbolicDistribution() * DummyRealDistribution() * DummyIntegerDistribution()))
distribution = DeterministicSumUnit.from_sum_unit(distribution)
distribution.weights = [0.7, 0.3]
mode, likelihood = distribution.mode()
self.assertEqual(likelihood, 3 * 0.7)
self.assertEqual(len(mode), 1)
self.assertEqual(mode[0], Event({distribution.variables[0]: 4,
distribution.variables[1]: portion.open(1., 2.),
distribution.variables[2]: "a"}))


if __name__ == '__main__':
unittest.main()

0 comments on commit c7b482d

Please sign in to comment.