from __future__ import print_function, division from sympy import (Basic, sympify, symbols, Dummy, Lambda, summation, Piecewise, S, cacheit, Sum) from sympy.solvers.solveset import solveset from sympy.stats.rv import NamedArgsMixin, SinglePSpace, SingleDomain import random class SingleDiscreteDistribution(Basic, NamedArgsMixin): """ Discrete distribution of a single variable Serves as superclass for PoissonDistribution etc.... Provides methods for pdf, cdf, and sampling See Also: sympy.stats.crv_types.* """ set = S.Integers def __new__(cls, *args): args = list(map(sympify, args)) return Basic.__new__(cls, *args) @staticmethod def check(*args): pass def sample(self): """ A random realization from the distribution """ icdf = self._inverse_cdf_expression() return floor(icdf(random.uniform(0, 1))) @cacheit def _inverse_cdf_expression(self): """ Inverse of the CDF Used by sample """ x, z = symbols('x, z', real=True, positive=True, cls=Dummy) # Invert CDF try: inverse_cdf = list(solveset(self.cdf(x) - z, x)) except NotImplementedError: inverse_cdf = None if not inverse_cdf or len(inverse_cdf) != 1: raise NotImplementedError("Could not invert CDF") return Lambda(z, inverse_cdf[0]) @cacheit def compute_cdf(self, **kwargs): """ Compute the CDF from the PDF Returns a Lambda """ x, z = symbols('x, z', integer=True, finite=True, cls=Dummy) left_bound = self.set.inf # CDF is integral of PDF from left bound to z pdf = self.pdf(x) cdf = summation(pdf, (x, left_bound, z), **kwargs) # CDF Ensure that CDF left of left_bound is zero cdf = Piecewise((cdf, z >= left_bound), (0, True)) return Lambda(z, cdf) def cdf(self, x, **kwargs): """ Cumulative density function """ return self.compute_cdf(**kwargs)(x) def expectation(self, expr, var, evaluate=True, **kwargs): """ Expectation of expression over distribution """ # TODO: support discrete sets with non integer stepsizes if evaluate: return summation(expr * self.pdf(var), (var, self.set.inf, self.set.sup), **kwargs) else: return Sum(expr * self.pdf(var), (var, self.set.inf, self.set.sup), **kwargs) def __call__(self, *args): return self.pdf(*args) class SingleDiscreteDomain(SingleDomain): pass class SingleDiscretePSpace(SinglePSpace): """ Discrete probability space over a single univariate variable """ is_real = True @property def set(self): return self.distribution.set @property def domain(self): return SingleDiscreteDomain(self.symbol, self.set) def sample(self): """ Internal sample method Returns dictionary mapping RandomSymbol to realization value. """ return {self.value: self.distribution.sample()} def integrate(self, expr, rvs=None, **kwargs): rvs = rvs or (self.value,) if self.value not in rvs: return expr expr = expr.xreplace(dict((rv, rv.symbol) for rv in rvs)) x = self.value.symbol try: return self.distribution.expectation(expr, x, evaluate=False, **kwargs) except Exception: return Sum(expr * self.pdf, (x, self.set.inf, self.set.sup), **kwargs) def compute_cdf(self, expr, **kwargs): if expr == self.value: return self.distribution.compute_cdf(**kwargs) else: raise NotImplementedError() def compute_density(self, expr, **kwargs): if expr == self.value: return self.distribution raise NotImplementedError()