# 聚类（Clustering）之GMM

2022/04/13 17:51

GMM的优点在于速度较快，和K-means差不多，缺点是某一类点数太少的时候收敛性不好。GMM主要难点在于判定某个点到底属于哪个高斯分布，最大期望法(Expectation-Maximization, EM)是处理此问题的一个较好的统计算法。算法核心在于，

1. 任选一些中心点，然后计算每个点归属于这些中心的概率；

2. 不断调整这些中心点的参数（例如高斯分布的中心坐标和协方差矩阵等），给出最大似然的参数；

3. 重复此过程给出最优结果。

gmm.py

"""
Gaussian Mixture Models.

This implementation corresponds to frequentist (non-Bayesian) formulation
of Gaussian Mixture Models.
"""

# Author: Ron Weiss <ronweiss@gmail.com>
#         Fabian Pedregosa <fabian.pedregosa@inria.fr>
#         Bertrand Thirion <bertrand.thirion@inria.fr>

import numpy as np
from scipy import linalg
from time import time

from KMeans import KMeans, check_random_state

EPS = np.finfo(float).eps

def logsumexp(arr, axis=0):
"""Computes the sum of arr assuming arr is in the log domain.

Returns log(sum(exp(arr))) while minimizing the possibility of
over/underflow.

Examples
--------

>>> import numpy as np
>>> from sklearn.utils.extmath import logsumexp
>>> a = np.arange(10)
>>> np.log(np.sum(np.exp(a)))
9.4586297444267107
>>> logsumexp(a)
9.4586297444267107
"""
arr = np.rollaxis(arr, axis)
# Use the max to normalize, as with the log this is what accumulates
# the less errors
vmax = arr.max(axis=0)
out = np.log(np.sum(np.exp(arr - vmax), axis=0))
out += vmax
return out

def log_multivariate_normal_density(X, means, covars, covariance_type='diag'):
"""Compute the log probability under a multivariate Gaussian distribution.

Parameters
----------
X : array_like, shape (n_samples, n_features)
List of n_features-dimensional data points.  Each row corresponds to a
single data point.

means : array_like, shape (n_components, n_features)
List of n_features-dimensional mean vectors for n_components Gaussians.
Each row corresponds to a single mean vector.

covars : array_like
List of n_components covariance parameters for each Gaussian. The shape
depends on covariance_type:
(n_components, n_features)      if 'spherical',
(n_features, n_features)    if 'tied',
(n_components, n_features)    if 'diag',
(n_components, n_features, n_features) if 'full'

covariance_type : string
Type of the covariance parameters.  Must be one of
'spherical', 'tied', 'diag', 'full'.  Defaults to 'diag'.

Returns
-------
lpr : array_like, shape (n_samples, n_components)
Array containing the log probabilities of each data point in
X under each of the n_components multivariate Gaussian distributions.
"""
log_multivariate_normal_density_dict = {
'spherical': _log_multivariate_normal_density_spherical,
'tied': _log_multivariate_normal_density_tied,
'diag': _log_multivariate_normal_density_diag,
'full': _log_multivariate_normal_density_full}
return log_multivariate_normal_density_dict[covariance_type](
X, means, covars)

def sample_gaussian(mean, covar, covariance_type='diag', n_samples=1,
random_state=None):
"""Generate random samples from a Gaussian distribution.

Parameters
----------
mean : array_like, shape (n_features,)
Mean of the distribution.

covar : array_like, optional
Covariance of the distribution. The shape depends on covariance_type:
scalar if 'spherical',
(n_features) if 'diag',
(n_features, n_features)  if 'tied', or 'full'

covariance_type : string, optional
Type of the covariance parameters.  Must be one of
'spherical', 'tied', 'diag', 'full'.  Defaults to 'diag'.

n_samples : int, optional
Number of samples to generate. Defaults to 1.

Returns
-------
X : array, shape (n_features, n_samples)
Randomly generated sample
"""
rng = check_random_state(random_state)
n_dim = len(mean)
rand = rng.randn(n_dim, n_samples)
if n_samples == 1:
rand.shape = (n_dim,)

if covariance_type == 'spherical':
rand *= np.sqrt(covar)
elif covariance_type == 'diag':
rand = np.dot(np.diag(np.sqrt(covar)), rand)
else:
s, U = linalg.eigh(covar)
s.clip(0, out=s)        # get rid of tiny negatives
np.sqrt(s, out=s)
U *= s
rand = np.dot(U, rand)

return (rand.T + mean).T

class GMM:
"""Gaussian Mixture Model

Representation of a Gaussian mixture model probability distribution.
This class allows for easy evaluation of, sampling from, and
maximum-likelihood estimation of the parameters of a GMM distribution.

Initializes parameters such that every mixture component has zero
mean and identity covariance.

Parameters
----------
n_components : int, optional
Number of mixture components. Defaults to 1.

covariance_type : string, optional
String describing the type of covariance parameters to
use.  Must be one of 'spherical', 'tied', 'diag', 'full'.
Defaults to 'diag'.

random_state: RandomState or an int seed (None by default)
A random number generator instance

min_covar : float, optional
Floor on the diagonal of the covariance matrix to prevent
overfitting.  Defaults to 1e-3.

tol : float, optional
Convergence threshold. EM iterations will stop when average
gain in log-likelihood is below this threshold.  Defaults to 1e-3.

n_iter : int, optional
Number of EM iterations to perform.

n_init : int, optional
Number of initializations to perform. the best results is kept

params : string, optional
Controls which parameters are updated in the training
process.  Can contain any combination of 'w' for weights,
'm' for means, and 'c' for covars.  Defaults to 'wmc'.

init_params : string, optional
Controls which parameters are updated in the initialization
process.  Can contain any combination of 'w' for weights,
'm' for means, and 'c' for covars.  Defaults to 'wmc'.

verbose : int, default: 0
Enable verbose output. If 1 then it always prints the current
initialization and iteration step. If greater than 1 then
it prints additionally the change and time needed for each step.

Attributes
----------
weights_ : array, shape (n_components,)
This attribute stores the mixing weights for each mixture component.

means_ : array, shape (n_components, n_features)
Mean parameters for each mixture component.

covars_ : array
Covariance parameters for each mixture component.  The shape
depends on covariance_type::

(n_components, n_features)             if 'spherical',
(n_features, n_features)               if 'tied',
(n_components, n_features)             if 'diag',
(n_components, n_features, n_features) if 'full'

converged_ : bool
True when convergence was reached in fit(), False otherwise.

--------

DPGMM : Infinite gaussian mixture model, using the dirichlet
process, fit with a variational algorithm

VBGMM : Finite gaussian mixture model fit with a variational
algorithm, better for situations where there might be too little
data to get a good estimate of the covariance matrix.

Examples
--------

>>> import numpy as np
>>> from sklearn import mixture
>>> np.random.seed(1)
>>> g = mixture.GMM(n_components=2)
>>> # Generate random observations with two modes centered on 0
>>> # and 10 to use for training.
>>> obs = np.concatenate((np.random.randn(100, 1),
...                       10 + np.random.randn(300, 1)))
>>> g.fit(obs) # doctest: +NORMALIZE_WHITESPACE
GMM(covariance_type='diag', init_params='wmc', min_covar=0.001,
n_components=2, n_init=1, n_iter=100, params='wmc',
random_state=None, tol=0.001, verbose=0)
>>> np.round(g.weights_, 2)
array([ 0.75,  0.25])
>>> np.round(g.means_, 2)
array([[ 10.05],
[  0.06]])
>>> np.round(g.covars_, 2) #doctest: +SKIP
array([[[ 1.02]],
[[ 0.96]]])
>>> g.predict([[0], [2], [9], [10]]) #doctest: +ELLIPSIS
array([1, 1, 0, 0]...)
>>> np.round(g.score([[0], [2], [9], [10]]), 2)
array([-2.19, -4.58, -1.75, -1.21])
>>> # Refit the model on new data (initial parameters remain the
>>> # same), this time with an even split between the two modes.
>>> g.fit(20 * [[0]] +  20 * [[10]]) # doctest: +NORMALIZE_WHITESPACE
GMM(covariance_type='diag', init_params='wmc', min_covar=0.001,
n_components=2, n_init=1, n_iter=100, params='wmc',
random_state=None, tol=0.001, verbose=0)
>>> np.round(g.weights_, 2)
array([ 0.5,  0.5])

"""

def __init__(self, n_components=1, covariance_type='diag',
random_state=None, tol=1e-3, min_covar=1e-3,
n_iter=100, n_init=1, params='wmc', init_params='wmc',
verbose=0):
self.n_components = n_components
self.covariance_type = covariance_type
self.tol = tol
self.min_covar = min_covar
self.random_state = random_state
self.n_iter = n_iter
self.n_init = n_init
self.params = params
self.init_params = init_params
self.verbose = verbose

if covariance_type not in ['spherical', 'tied', 'diag', 'full']:
raise ValueError('Invalid value for covariance_type: %s' %
covariance_type)

if n_init < 1:
raise ValueError('GMM estimation requires at least one run')

self.weights_ = np.ones(self.n_components) / self.n_components

# flag to indicate exit status of fit() method: converged (True) or
# n_iter reached (False)
self.converged_ = False

def _get_covars(self):
"""Covariance parameters for each mixture component.

The shape depends on cvtype::

(n_states, n_features)                if 'spherical',
(n_features, n_features)              if 'tied',
(n_states, n_features)                if 'diag',
(n_states, n_features, n_features)    if 'full'

"""
if self.covariance_type == 'full':
return self.covars_
elif self.covariance_type == 'diag':
return [np.diag(cov) for cov in self.covars_]
elif self.covariance_type == 'tied':
return [self.covars_] * self.n_components
elif self.covariance_type == 'spherical':
return [np.diag(cov) for cov in self.covars_]

def _set_covars(self, covars):
"""Provide values for covariance"""
covars = np.asarray(covars)
_validate_covars(covars, self.covariance_type, self.n_components)
self.covars_ = covars

def score_samples(self, X):
"""Return the per-sample likelihood of the data under the model.

Compute the log probability of X under the model and
return the posterior distribution (responsibilities) of each
mixture component for each element of X.

Parameters
----------
X: array_like, shape (n_samples, n_features)
List of n_features-dimensional data points. Each row
corresponds to a single data point.

Returns
-------
logprob : array_like, shape (n_samples,)
Log probabilities of each data point in X.

responsibilities : array_like, shape (n_samples, n_components)
Posterior probabilities of each mixture component for each
observation
"""
if X.ndim == 1:
X = X[:, np.newaxis]
if X.size == 0:
return np.array([]), np.empty((0, self.n_components))
if X.shape[1] != self.means_.shape[1]:
raise ValueError('The shape of X  is not compatible with self')

lpr = (log_multivariate_normal_density(X, self.means_, self.covars_,
self.covariance_type) +
np.log(self.weights_))
logprob = logsumexp(lpr, axis=1)
responsibilities = np.exp(lpr - logprob[:, np.newaxis])
return logprob, responsibilities

def score(self, X, y=None):
"""Compute the log probability under the model.

Parameters
----------
X : array_like, shape (n_samples, n_features)
List of n_features-dimensional data points.  Each row
corresponds to a single data point.

Returns
-------
logprob : array_like, shape (n_samples,)
Log probabilities of each data point in X
"""
logprob, _ = self.score_samples(X)
return logprob

def predict(self, X):
"""Predict label for data.

Parameters
----------
X : array-like, shape = [n_samples, n_features]

Returns
-------
C : array, shape = (n_samples,) component memberships
"""
logprob, responsibilities = self.score_samples(X)
return responsibilities.argmax(axis=1)

def predict_proba(self, X):
"""Predict posterior probability of data under each Gaussian
in the model.

Parameters
----------
X : array-like, shape = [n_samples, n_features]

Returns
-------
responsibilities : array-like, shape = (n_samples, n_components)
Returns the probability of the sample for each Gaussian
(state) in the model.
"""
logprob, responsibilities = self.score_samples(X)
return responsibilities

def sample(self, n_samples=1, random_state=None):
"""Generate random samples from the model.

Parameters
----------
n_samples : int, optional
Number of samples to generate. Defaults to 1.

Returns
-------
X : array_like, shape (n_samples, n_features)
List of samples
"""
if random_state is None:
random_state = self.random_state
random_state = check_random_state(random_state)
weight_cdf = np.cumsum(self.weights_)

X = np.empty((n_samples, self.means_.shape[1]))
rand = random_state.rand(n_samples)
# decide which component to use for each sample
comps = weight_cdf.searchsorted(rand)
# for each component, generate all needed samples
for comp in range(self.n_components):
# occurrences of current component in X
comp_in_X = (comp == comps)
# number of those occurrences
num_comp_in_X = comp_in_X.sum()
if num_comp_in_X > 0:
if self.covariance_type == 'tied':
cv = self.covars_
elif self.covariance_type == 'spherical':
cv = self.covars_[comp][0]
else:
cv = self.covars_[comp]
X[comp_in_X] = sample_gaussian(
self.means_[comp], cv, self.covariance_type,
num_comp_in_X, random_state=random_state).T
return X

def fit_predict(self, X, y=None):
"""Fit and then predict labels for data.

Warning: due to the final maximization step in the EM algorithm,
with low iterations the prediction may not be 100% accurate

Parameters
----------
X : array-like, shape = [n_samples, n_features]

Returns
-------
C : array, shape = (n_samples,) component memberships
"""
return self._fit(X, y).argmax(axis=1)

def _fit(self, X, y=None, do_prediction=False):
"""Estimate model parameters with the EM algorithm.

A initialization step is performed before entering the
expectation-maximization (EM) algorithm. If you want to avoid
this step, set the keyword argument init_params to the empty
string '' when creating the GMM object. Likewise, if you would
like just to do an initialization, set n_iter=0.

Parameters
----------
X : array_like, shape (n, n_features)
List of n_features-dimensional data points.  Each row
corresponds to a single data point.

Returns
-------
responsibilities : array, shape (n_samples, n_components)
Posterior probabilities of each mixture component for each
observation.
"""
# initialization step
if X.shape[0] < self.n_components:
raise ValueError(
'GMM estimation with %s components, but got only %s samples' %
(self.n_components, X.shape[0]))

max_log_prob = -np.infty

if self.verbose > 0:
print('Expectation-maximization algorithm started.')

for init in range(self.n_init):
if self.verbose > 0:
print('Initialization ' + str(init + 1))
start_init_time = time()

if 'm' in self.init_params or not hasattr(self, 'means_'):
self.means_ = KMeans(
n_clusters=self.n_components,
random_state=self.random_state).fit(X).cluster_centers_
if self.verbose > 1:
print('\tMeans have been initialized.')

if 'w' in self.init_params or not hasattr(self, 'weights_'):
self.weights_ = np.tile(1.0 / self.n_components,
self.n_components)
if self.verbose > 1:
print('\tWeights have been initialized.')

if 'c' in self.init_params or not hasattr(self, 'covars_'):
cv = np.cov(X.T) + self.min_covar * np.eye(X.shape[1])
if not cv.shape:
cv.shape = (1, 1)
self.covars_ = \
distribute_covar_matrix_to_match_covariance_type(
cv, self.covariance_type, self.n_components)
if self.verbose > 1:
print('\tCovariance matrices have been initialized.')

# EM algorithms
current_log_likelihood = None
# reset self.converged_ to False
self.converged_ = False

for i in range(self.n_iter):
if self.verbose > 0:
print('\tEM iteration ' + str(i + 1))
start_iter_time = time()
prev_log_likelihood = current_log_likelihood
# Expectation step
log_likelihoods, responsibilities = self.score_samples(X)
current_log_likelihood = log_likelihoods.mean()

# Check for convergence.
if prev_log_likelihood is not None:
change = abs(current_log_likelihood - prev_log_likelihood)
if self.verbose > 1:
print('\t\tChange: ' + str(change))
if change < self.tol:
self.converged_ = True
if self.verbose > 0:
print('\t\tEM algorithm converged.')
break

# Maximization step
self._do_mstep(X, responsibilities, self.params,
self.min_covar)
if self.verbose > 1:
print('\t\tEM iteration ' + str(i + 1) + ' took {0:.5f}s'.format(
time() - start_iter_time))

# if the results are better, keep it
if self.n_iter:
if current_log_likelihood > max_log_prob:
max_log_prob = current_log_likelihood
best_params = {'weights': self.weights_,
'means': self.means_,
'covars': self.covars_}
if self.verbose > 1:
print('\tBetter parameters were found.')

if self.verbose > 1:
print('\tInitialization ' + str(init + 1) + ' took {0:.5f}s'.format(
time() - start_init_time))

# check the existence of an init param that was not subject to
# likelihood computation issue.
if np.isneginf(max_log_prob) and self.n_iter:
raise RuntimeError(
"EM algorithm was never able to compute a valid likelihood " +
"given initial parameters. Try different init parameters " +
"(or increasing n_init) or check for degenerate data.")

if self.n_iter:
self.covars_ = best_params['covars']
self.means_ = best_params['means']
self.weights_ = best_params['weights']
else:  # self.n_iter == 0 occurs when using GMM within HMM
# Need to make sure that there are responsibilities to output
# Output zeros because it was just a quick initialization
responsibilities = np.zeros((X.shape[0], self.n_components))

return responsibilities

def fit(self, X, y=None):
"""Estimate model parameters with the EM algorithm.

A initialization step is performed before entering the
expectation-maximization (EM) algorithm. If you want to avoid
this step, set the keyword argument init_params to the empty
string '' when creating the GMM object. Likewise, if you would
like just to do an initialization, set n_iter=0.

Parameters
----------
X : array_like, shape (n, n_features)
List of n_features-dimensional data points.  Each row
corresponds to a single data point.

Returns
-------
self
"""
self._fit(X, y)
return self

def _do_mstep(self, X, responsibilities, params, min_covar=0):
""" Perform the Mstep of the EM algorithm and return the class weights
"""
weights = responsibilities.sum(axis=0)
weighted_X_sum = np.dot(responsibilities.T, X)
inverse_weights = 1.0 / (weights[:, np.newaxis] + 10 * EPS)

if 'w' in params:
self.weights_ = (weights / (weights.sum() + 10 * EPS) + EPS)
if 'm' in params:
self.means_ = weighted_X_sum * inverse_weights
if 'c' in params:
covar_mstep_func = _covar_mstep_funcs[self.covariance_type]
self.covars_ = covar_mstep_func(
self, X, responsibilities, weighted_X_sum, inverse_weights,
min_covar)
return weights

def _n_parameters(self):
"""Return the number of free parameters in the model."""
ndim = self.means_.shape[1]
if self.covariance_type == 'full':
cov_params = self.n_components * ndim * (ndim + 1) / 2.
elif self.covariance_type == 'diag':
cov_params = self.n_components * ndim
elif self.covariance_type == 'tied':
cov_params = ndim * (ndim + 1) / 2.
elif self.covariance_type == 'spherical':
cov_params = self.n_components
mean_params = ndim * self.n_components
return int(cov_params + mean_params + self.n_components - 1)

def bic(self, X):
"""Bayesian information criterion for the current model fit
and the proposed data

Parameters
----------
X : array of shape(n_samples, n_dimensions)

Returns
-------
bic: float (the lower the better)
"""
return (-2 * self.score(X).sum() +
self._n_parameters() * np.log(X.shape[0]))

def aic(self, X):
"""Akaike information criterion for the current model fit
and the proposed data

Parameters
----------
X : array of shape(n_samples, n_dimensions)

Returns
-------
aic: float (the lower the better)
"""
return - 2 * self.score(X).sum() + 2 * self._n_parameters()

#########################################################################
# some helper routines
#########################################################################

def _log_multivariate_normal_density_diag(X, means, covars):
"""Compute Gaussian log-density at X for a diagonal model"""
n_samples, n_dim = X.shape
lpr = -0.5 * (n_dim * np.log(2 * np.pi) + np.sum(np.log(covars), 1)
+ np.sum((means ** 2) / covars, 1)
- 2 * np.dot(X, (means / covars).T)
+ np.dot(X ** 2, (1.0 / covars).T))
return lpr

def _log_multivariate_normal_density_spherical(X, means, covars):
"""Compute Gaussian log-density at X for a spherical model"""
cv = covars.copy()
if covars.ndim == 1:
cv = cv[:, np.newaxis]
if covars.shape[1] == 1:
cv = np.tile(cv, (1, X.shape[-1]))
return _log_multivariate_normal_density_diag(X, means, cv)

def _log_multivariate_normal_density_tied(X, means, covars):
"""Compute Gaussian log-density at X for a tied model"""
cv = np.tile(covars, (means.shape[0], 1, 1))
return _log_multivariate_normal_density_full(X, means, cv)

def _log_multivariate_normal_density_full(X, means, covars, min_covar=1.e-7):
"""Log probability for full covariance matrices."""
n_samples, n_dim = X.shape
nmix = len(means)
log_prob = np.empty((n_samples, nmix))
for c, (mu, cv) in enumerate(zip(means, covars)):
# XXX
print("test: ", mu, cv)
try:
cv_chol = linalg.cholesky(cv, lower=True)
except linalg.LinAlgError:
# The model is most probably stuck in a component with too
# few observations, we need to reinitialize this components
try:
cv_chol = linalg.cholesky(cv + min_covar * np.eye(n_dim),
lower=True)
except linalg.LinAlgError:
raise ValueError("'covars' must be symmetric, "
"positive-definite")

cv_log_det = 2 * np.sum(np.log(np.diagonal(cv_chol)))
cv_sol = linalg.solve_triangular(cv_chol, (X - mu).T, lower=True).T
log_prob[:, c] = - .5 * (np.sum(cv_sol ** 2, axis=1) +
n_dim * np.log(2 * np.pi) + cv_log_det)

return log_prob

def _validate_covars(covars, covariance_type, n_components):
"""Do basic checks on matrix covariance sizes and values
"""
from scipy import linalg
if covariance_type == 'spherical':
if len(covars) != n_components:
raise ValueError("'spherical' covars have length n_components")
elif np.any(covars <= 0):
raise ValueError("'spherical' covars must be non-negative")
elif covariance_type == 'tied':
if covars.shape[0] != covars.shape[1]:
raise ValueError("'tied' covars must have shape (n_dim, n_dim)")
elif (not np.allclose(covars, covars.T)
or np.any(linalg.eigvalsh(covars) <= 0)):
raise ValueError("'tied' covars must be symmetric, "
"positive-definite")
elif covariance_type == 'diag':
if len(covars.shape) != 2:
raise ValueError("'diag' covars must have shape "
"(n_components, n_dim)")
elif np.any(covars <= 0):
raise ValueError("'diag' covars must be non-negative")
elif covariance_type == 'full':
if len(covars.shape) != 3:
raise ValueError("'full' covars must have shape "
"(n_components, n_dim, n_dim)")
elif covars.shape[1] != covars.shape[2]:
raise ValueError("'full' covars must have shape "
"(n_components, n_dim, n_dim)")
for n, cv in enumerate(covars):
if (not np.allclose(cv, cv.T)
or np.any(linalg.eigvalsh(cv) <= 0)):
raise ValueError("component %d of 'full' covars must be "
"symmetric, positive-definite" % n)
else:
raise ValueError("covariance_type must be one of " +
"'spherical', 'tied', 'diag', 'full'")

def distribute_covar_matrix_to_match_covariance_type(
tied_cv, covariance_type, n_components):
"""Create all the covariance matrices from a given template"""
if covariance_type == 'spherical':
cv = np.tile(tied_cv.mean() * np.ones(tied_cv.shape[1]),
(n_components, 1))
elif covariance_type == 'tied':
cv = tied_cv
elif covariance_type == 'diag':
cv = np.tile(np.diag(tied_cv), (n_components, 1))
elif covariance_type == 'full':
cv = np.tile(tied_cv, (n_components, 1, 1))
else:
raise ValueError("covariance_type must be one of " +
"'spherical', 'tied', 'diag', 'full'")
return cv

def _covar_mstep_diag(gmm, X, responsibilities, weighted_X_sum, norm,
min_covar):
"""Performing the covariance M step for diagonal cases"""
avg_X2 = np.dot(responsibilities.T, X * X) * norm
avg_means2 = gmm.means_ ** 2
avg_X_means = gmm.means_ * weighted_X_sum * norm
return avg_X2 - 2 * avg_X_means + avg_means2 + min_covar

def _covar_mstep_spherical(*args):
"""Performing the covariance M step for spherical cases"""
cv = _covar_mstep_diag(*args)
return np.tile(cv.mean(axis=1)[:, np.newaxis], (1, cv.shape[1]))

def _covar_mstep_full(gmm, X, responsibilities, weighted_X_sum, norm,
min_covar):
"""Performing the covariance M step for full cases"""
# Eq. 12 from K. Murphy, "Fitting a Conditional Linear Gaussian
# Distribution"
n_features = X.shape[1]
cv = np.empty((gmm.n_components, n_features, n_features))
for c in range(gmm.n_components):
post = responsibilities[:, c]
mu = gmm.means_[c]
diff = X - mu
with np.errstate(under='ignore'):
# Underflow Errors in doing post * X.T are  not important
avg_cv = np.dot(post * diff.T, diff) / (post.sum() + 10 * EPS)
cv[c] = avg_cv + min_covar * np.eye(n_features)
return cv

def _covar_mstep_tied(gmm, X, responsibilities, weighted_X_sum, norm,
min_covar):
# Eq. 15 from K. Murphy, "Fitting a Conditional Linear Gaussian
# Distribution"
avg_X2 = np.dot(X.T, X)
avg_means2 = np.dot(gmm.means_.T, weighted_X_sum)
out = avg_X2 - avg_means2
out *= 1. / X.shape[0]
out.flat[::len(out) + 1] += min_covar
return out

_covar_mstep_funcs = {'spherical': _covar_mstep_spherical,
'diag': _covar_mstep_diag,
'tied': _covar_mstep_tied,
'full': _covar_mstep_full,
}

gmm_test1.py

import matplotlib.pyplot as plt
import matplotlib as mpl
import numpy as np

from sklearn import datasets
from sklearn.cross_validation import StratifiedKFold
from gmm import GMM

def make_ellipses(gmm, ax):
for n, color in enumerate('rgb'):
v, w = np.linalg.eigh(gmm._get_covars()[n][:2, :2])
u = w[0] / np.linalg.norm(w[0])
angle = np.arctan2(u[1], u[0])
angle = 180 * angle / np.pi  # convert to degrees
v *= 9
ell = mpl.patches.Ellipse(gmm.means_[n, :2], v[0], v[1],
180 + angle, color=color)
ell.set_clip_box(ax.bbox)
ell.set_alpha(0.5)

# Break up the dataset into non-overlapping training (75%) and testing
# (25%) sets.
skf = StratifiedKFold(iris.target, n_folds=4)
# Only take the first fold.
train_index, test_index = next(iter(skf))

X_train = iris.data[train_index]
y_train = iris.target[train_index]
X_test = iris.data[test_index]
y_test = iris.target[test_index]

n_classes = len(np.unique(y_train))

# Try GMMs using different types of covariances.
classifiers = dict((covar_type, GMM(n_components=n_classes,
covariance_type=covar_type, init_params='wc', n_iter=20))
for covar_type in ['spherical', 'diag', 'tied', 'full'])

n_classifiers = len(classifiers)

plt.figure(figsize=(3 * n_classifiers / 2, 6))
left=.01, right=.99)

for index, (name, classifier) in enumerate(classifiers.items()):
# Since we have class labels for the training data, we can
# initialize the GMM parameters in a supervised manner.
classifier.means_ = np.array([X_train[y_train == i].mean(axis=0)
for i in range(0, n_classes)])

# Train the other parameters using the EM algorithm.
classifier.fit(X_train)

h = plt.subplot(2, n_classifiers / 2, index + 1)
make_ellipses(classifier, h)

for n, color in enumerate('rgb'):
data = iris.data[iris.target == n]
plt.scatter(data[:, 0], data[:, 1], 0.8, color=color,
label=iris.target_names[n])
# Plot the test data with crosses
for n, color in enumerate('rgb'):
data = X_test[y_test == n]
plt.plot(data[:, 0], data[:, 1], 'x', color=color)

y_train_pred = classifier.predict(X_train)
train_accuracy = np.mean(y_train_pred.ravel() == y_train.ravel()) * 100
plt.text(0.05, 0.9, 'Train accuracy: %.1f' % train_accuracy,
transform=h.transAxes)

y_test_pred = classifier.predict(X_test)
test_accuracy = np.mean(y_test_pred.ravel() == y_test.ravel()) * 100
plt.text(0.05, 0.8, 'Test accuracy: %.1f' % test_accuracy,
transform=h.transAxes)

plt.xticks(())
plt.yticks(())
plt.title(name)

plt.legend(loc='lower right', prop=dict(size=12))

plt.show()

gmm_test2.py

import numpy as np
import matplotlib.pyplot as plt
from matplotlib.colors import LogNorm
from gmm import GMM

n_samples = 300

# generate random sample, two components
np.random.seed(0)

# generate spherical data centered on (20, 20)
shifted_gaussian = np.random.randn(n_samples, 2) + np.array([20, 20])

# generate zero centered stretched Gaussian data
C = np.array([[0., -0.7], [3.5, .7]])
stretched_gaussian = np.dot(np.random.randn(n_samples, 2), C)

# concatenate the two datasets into the final training set
X_train = np.vstack([shifted_gaussian, stretched_gaussian])

# fit a Gaussian Mixture Model with two components
clf = GMM(n_components=2, covariance_type='full')
clf.fit(X_train)

# display predicted scores by the model as a contour plot
x = np.linspace(-20.0, 30.0)
y = np.linspace(-20.0, 40.0)
X, Y = np.meshgrid(x, y)
XX = np.array([X.ravel(), Y.ravel()]).T
Z = -clf.score_samples(XX)[0]
Z = Z.reshape(X.shape)

CS = plt.contour(X, Y, Z, norm=LogNorm(vmin=1.0, vmax=1000.0),
levels=np.logspace(0, 3, 10))
CB = plt.colorbar(CS, shrink=0.8, extend='both')
plt.scatter(X_train[:, 0], X_train[:, 1], .8)

plt.title('Negative log-likelihood predicted by a GMM')
plt.axis('tight')
plt.show()

gmm_test3.py

import itertools

import numpy as np
from scipy import linalg
import matplotlib.pyplot as plt
import matplotlib as mpl

from gmm import GMM

# Number of samples per component
n_samples = 500

# Generate random sample, two components
np.random.seed(0)
C = np.array([[0., -0.1], [1.7, .4]])
X = np.r_[np.dot(np.random.randn(n_samples, 2), C),
.7 * np.random.randn(n_samples, 2) + np.array([-6, 3])]

lowest_bic = np.infty
bic = []
n_components_range = range(1, 7)
cv_types = ['spherical', 'tied', 'diag', 'full']
for cv_type in cv_types:
for n_components in n_components_range:
# Fit a mixture of Gaussians with EM
gmm = GMM(n_components=n_components, covariance_type=cv_type)
gmm.fit(X)
bic.append(gmm.bic(X))
if bic[-1] < lowest_bic:
lowest_bic = bic[-1]
best_gmm = gmm

bic = np.array(bic)
color_iter = itertools.cycle(['k', 'r', 'g', 'b', 'c', 'm', 'y'])
clf = best_gmm
bars = []

# Plot the BIC scores
plt.figure(figsize=(8,6))
spl = plt.subplot(2, 1, 1)
for i, (cv_type, color) in enumerate(zip(cv_types, color_iter)):
xpos = np.array(n_components_range) + .2 * (i - 2)
bars.append(plt.bar(xpos, bic[i * len(n_components_range):
(i + 1) * len(n_components_range)],
width=.2, color=color))
plt.xticks(n_components_range)
plt.ylim([bic.min() * 1.01 - .01 * bic.max(), bic.max()])
plt.title('BIC score per model')
xpos = np.mod(bic.argmin(), len(n_components_range)) + .65 +\
.2 * np.floor(bic.argmin() / len(n_components_range))
plt.text(xpos, bic.min() * 0.97 + .03 * bic.max(), '*', fontsize=14)
spl.set_xlabel('Number of components')
spl.legend([b[0] for b in bars], cv_types)

# Plot the winner
splot = plt.subplot(2, 1, 2)
Y_ = clf.predict(X)
for i, (mean, covar, color) in enumerate(zip(clf.means_, clf.covars_,
color_iter)):
v, w = linalg.eigh(covar)
if not np.any(Y_ == i):
continue
plt.scatter(X[Y_ == i, 0], X[Y_ == i, 1], .8, color=color)

# Plot an ellipse to show the Gaussian component
angle = np.arctan2(w[0][1], w[0][0])
angle = 180 * angle / np.pi  # convert to degrees
v *= 4
ell = mpl.patches.Ellipse(mean, v[0], v[1], 180 + angle, color=color)
ell.set_clip_box(splot.bbox)
ell.set_alpha(.5)

plt.xlim(-10, 10)
plt.ylim(-3, 6)
plt.xticks(())
plt.yticks(())
plt.title('Selected GMM: full model, 2 components')
plt.show()

Variational Inference for Dirichlet Process Mixtures David Blei, Michael Jordan. Bayesian Analysis, 2006

dpgmm.py

"""Bayesian Gaussian Mixture Models and
Dirichlet Process Gaussian Mixture Models"""
from __future__ import print_function

# Author: Alexandre Passos (alexandre.tp@gmail.com)
#         Bertrand Thirion <bertrand.thirion@inria.fr>
#
# Based on mixture.py by:
#         Ron Weiss <ronweiss@gmail.com>
#         Fabian Pedregosa <fabian.pedregosa@inria.fr>
#

import numpy as np
from scipy.special import digamma as _digamma, gammaln as _gammaln
from scipy import linalg
from scipy.spatial.distance import cdist

from gmm import GMM, logsumexp
from KMeans import KMeans, check_random_state, squared_norm

def pinvh(a, cond=None, rcond=None, lower=True):
"""Compute the (Moore-Penrose) pseudo-inverse of a hermetian matrix.

Calculate a generalized inverse of a symmetric matrix using its
eigenvalue decomposition and including all 'large' eigenvalues.

Parameters
----------
a : array, shape (N, N)
Real symmetric or complex hermetian matrix to be pseudo-inverted

cond : float or None, default None
Cutoff for 'small' eigenvalues.
Singular values smaller than rcond * largest_eigenvalue are considered
zero.

If None or -1, suitable machine precision is used.

rcond : float or None, default None (deprecated)
Cutoff for 'small' eigenvalues.
Singular values smaller than rcond * largest_eigenvalue are considered
zero.

If None or -1, suitable machine precision is used.

lower : boolean
Whether the pertinent array data is taken from the lower or upper
triangle of a. (Default: lower)

Returns
-------
B : array, shape (N, N)

Raises
------
LinAlgError
If eigenvalue does not converge

Examples
--------
>>> import numpy as np
>>> a = np.random.randn(9, 6)
>>> a = np.dot(a, a.T)
>>> B = pinvh(a)
>>> np.allclose(a, np.dot(a, np.dot(B, a)))
True
>>> np.allclose(B, np.dot(B, np.dot(a, B)))
True

"""
a = np.asarray_chkfinite(a)
s, u = linalg.eigh(a, lower=lower)

if rcond is not None:
cond = rcond
if cond in [None, -1]:
t = u.dtype.char.lower()
factor = {'f': 1E3, 'd': 1E6}
cond = factor[t] * np.finfo(t).eps

# unlike svd case, eigh can lead to negative eigenvalues
above_cutoff = (abs(s) > cond * np.max(abs(s)))
psigma_diag = np.zeros_like(s)
psigma_diag[above_cutoff] = 1.0 / s[above_cutoff]

return np.dot(u * psigma_diag, np.conjugate(u).T)

def digamma(x):
return _digamma(x + np.finfo(np.float32).eps)

def gammaln(x):
return _gammaln(x + np.finfo(np.float32).eps)

def log_normalize(v, axis=0):
"""Normalized probabilities from unnormalized log-probabilites"""
v = np.rollaxis(v, axis)
v = v.copy()
v -= v.max(axis=0)
out = logsumexp(v)
v = np.exp(v - out)
v += np.finfo(np.float32).eps
v /= np.sum(v, axis=0)
return np.swapaxes(v, 0, axis)

def wishart_log_det(a, b, detB, n_features):
"""Expected value of the log of the determinant of a Wishart

The expected value of the logarithm of the determinant of a
wishart-distributed random variable with the specified parameters."""
l = np.sum(digamma(0.5 * (a - np.arange(-1, n_features - 1))))
l += n_features * np.log(2)
return l + detB

def wishart_logz(v, s, dets, n_features):
"The logarithm of the normalization constant for the wishart distribution"
z = 0.
z += 0.5 * v * n_features * np.log(2)
z += (0.25 * (n_features * (n_features - 1)) * np.log(np.pi))
z += 0.5 * v * np.log(dets)
z += np.sum(gammaln(0.5 * (v - np.arange(n_features) + 1)))
return z

def _bound_wishart(a, B, detB):
"""Returns a function of the dof, scale matrix and its determinant
used as an upper bound in variational approcimation of the evidence"""
n_features = B.shape[0]
logprior = wishart_logz(a, B, detB, n_features)
logprior -= wishart_logz(n_features,
np.identity(n_features),
1, n_features)
logprior += 0.5 * (a - 1) * wishart_log_det(a, B, detB, n_features)
logprior += 0.5 * a * np.trace(B)
return logprior

##############################################################################
# Variational bound on the log likelihood of each class
##############################################################################

"""helper function to calculate symmetric quadratic form x.T * A * x"""
q = (cdist(x, mu[np.newaxis], "mahalanobis", VI=A) ** 2).reshape(-1)
return q

def _bound_state_log_lik(X, initial_bound, precs, means, covariance_type):
"""Update the bound with likelihood terms, for standard covariance types"""
n_components, n_features = means.shape
n_samples = X.shape[0]
bound = np.empty((n_samples, n_components))
bound[:] = initial_bound
if covariance_type in ['diag', 'spherical']:
for k in range(n_components):
d = X - means[k]
bound[:, k] -= 0.5 * np.sum(d * d * precs[k], axis=1)
elif covariance_type == 'tied':
for k in range(n_components):
bound[:, k] -= 0.5 * _sym_quad_form(X, means[k], precs)
elif covariance_type == 'full':
for k in range(n_components):
bound[:, k] -= 0.5 * _sym_quad_form(X, means[k], precs[k])
return bound

class DPGMM(GMM):
"""Variational Inference for the Infinite Gaussian Mixture Model.

DPGMM stands for Dirichlet Process Gaussian Mixture Model, and it
is an infinite mixture model with the Dirichlet Process as a prior
distribution on the number of clusters. In practice the
approximate inference algorithm uses a truncated distribution with
a fixed maximum number of components, but almost always the number
of components actually used depends on the data.

Stick-breaking Representation of a Gaussian mixture model
probability distribution. This class allows for easy and efficient
inference of an approximate posterior distribution over the
parameters of a Gaussian mixture model with a variable number of
components (smaller than the truncation parameter n_components).

Initialization is with normally-distributed means and identity
covariance, for proper convergence.

Parameters
----------
n_components: int, default 1
Number of mixture components.

covariance_type: string, default 'diag'
String describing the type of covariance parameters to
use.  Must be one of 'spherical', 'tied', 'diag', 'full'.

alpha: float, default 1
Real number representing the concentration parameter of
the dirichlet process. Intuitively, the Dirichlet Process
is as likely to start a new cluster for a point as it is
to add that point to a cluster with alpha elements. A
higher alpha means more clusters, as the expected number
of clusters is alpha*log(N).

tol : float, default 1e-3
Convergence threshold.

n_iter : int, default 10
Maximum number of iterations to perform before convergence.

params : string, default 'wmc'
Controls which parameters are updated in the training
process.  Can contain any combination of 'w' for weights,
'm' for means, and 'c' for covars.

init_params : string, default 'wmc'
Controls which parameters are updated in the initialization
process.  Can contain any combination of 'w' for weights,
'm' for means, and 'c' for covars.  Defaults to 'wmc'.

verbose : int, default 0
Controls output verbosity.

Attributes
----------
covariance_type : string
String describing the type of covariance parameters used by
the DP-GMM.  Must be one of 'spherical', 'tied', 'diag', 'full'.

n_components : int
Number of mixture components.

weights_ : array, shape (n_components,)
Mixing weights for each mixture component.

means_ : array, shape (n_components, n_features)
Mean parameters for each mixture component.

precs_ : array
Precision (inverse covariance) parameters for each mixture
component.  The shape depends on covariance_type::

(n_components, 'n_features')                if 'spherical',
(n_features, n_features)                  if 'tied',
(n_components, n_features)                if 'diag',
(n_components, n_features, n_features)  if 'full'

converged_ : bool
True when convergence was reached in fit(), False otherwise.

--------
GMM : Finite Gaussian mixture model fit with EM

VBGMM : Finite Gaussian mixture model fit with a variational
algorithm, better for situations where there might be too little
data to get a good estimate of the covariance matrix.
"""

def __init__(self, n_components=1, covariance_type='diag', alpha=1.0,
random_state=None, tol=1e-3, verbose=0,
min_covar=None, n_iter=10, params='wmc', init_params='wmc'):
self.alpha = alpha
super(DPGMM, self).__init__(n_components, covariance_type,
random_state=random_state,
tol=tol, min_covar=min_covar,
n_iter=n_iter, params=params,
init_params=init_params, verbose=verbose)

def _get_precisions(self):
"""Return precisions as a full matrix."""
if self.covariance_type == 'full':
return self.precs_
elif self.covariance_type in ['diag', 'spherical']:
return [np.diag(cov) for cov in self.precs_]
elif self.covariance_type == 'tied':
return [self.precs_] * self.n_components

def _get_covars(self):
return [pinvh(c) for c in self._get_precisions()]

def _set_covars(self, covars):
raise NotImplementedError("""The variational algorithm does
not support setting the covariance parameters.""")

def score_samples(self, X):
"""Return the likelihood of the data under the model.

Compute the bound on log probability of X under the model
and return the posterior distribution (responsibilities) of
each mixture component for each element of X.

This is done by computing the parameters for the mean-field of
z for each observation.

Parameters
----------
X : array_like, shape (n_samples, n_features)
List of n_features-dimensional data points.  Each row
corresponds to a single data point.

Returns
-------
logprob : array_like, shape (n_samples,)
Log probabilities of each data point in X
responsibilities: array_like, shape (n_samples, n_components)
Posterior probabilities of each mixture component for each
observation
"""
if X.ndim == 1:
X = X[:, np.newaxis]
z = np.zeros((X.shape[0], self.n_components))
sd = digamma(self.gamma_.T[1] + self.gamma_.T[2])
dgamma1 = digamma(self.gamma_.T[1]) - sd
dgamma2 = np.zeros(self.n_components)
dgamma2[0] = digamma(self.gamma_[0, 2]) - digamma(self.gamma_[0, 1] +
self.gamma_[0, 2])
for j in range(1, self.n_components):
dgamma2[j] = dgamma2[j - 1] + digamma(self.gamma_[j - 1, 2])
dgamma2[j] -= sd[j - 1]
dgamma = dgamma1 + dgamma2
# Free memory and developers cognitive load:
del dgamma1, dgamma2, sd

if self.covariance_type not in ['full', 'tied', 'diag', 'spherical']:
raise NotImplementedError("This ctype is not implemented: %s"
% self.covariance_type)
p = _bound_state_log_lik(X, self._initial_bound + self.bound_prec_,
self.precs_, self.means_,
self.covariance_type)
z = p + dgamma
z = log_normalize(z, axis=-1)
bound = np.sum(z * p, axis=-1)
return bound, z

def _update_concentration(self, z):
"""Update the concentration parameters for each cluster"""
sz = np.sum(z, axis=0)
self.gamma_.T[1] = 1. + sz
self.gamma_.T[2].fill(0)
for i in range(self.n_components - 2, -1, -1):
self.gamma_[i, 2] = self.gamma_[i + 1, 2] + sz[i]
self.gamma_.T[2] += self.alpha

def _update_means(self, X, z):
"""Update the variational distributions for the means"""
n_features = X.shape[1]
for k in range(self.n_components):
if self.covariance_type in ['spherical', 'diag']:
num = np.sum(z.T[k].reshape((-1, 1)) * X, axis=0)
num *= self.precs_[k]
den = 1. + self.precs_[k] * np.sum(z.T[k])
self.means_[k] = num / den
elif self.covariance_type in ['tied', 'full']:
if self.covariance_type == 'tied':
cov = self.precs_
else:
cov = self.precs_[k]
den = np.identity(n_features) + cov * np.sum(z.T[k])
num = np.sum(z.T[k].reshape((-1, 1)) * X, axis=0)
num = np.dot(cov, num)
self.means_[k] = linalg.lstsq(den, num)[0]

def _update_precisions(self, X, z):
"""Update the variational distributions for the precisions"""
n_features = X.shape[1]
if self.covariance_type == 'spherical':
self.dof_ = 0.5 * n_features * np.sum(z, axis=0)
for k in range(self.n_components):
# could be more memory efficient ?
sq_diff = np.sum((X - self.means_[k]) ** 2, axis=1)
self.scale_[k] = 1.
self.scale_[k] += 0.5 * np.sum(z.T[k] * (sq_diff + n_features))
self.bound_prec_[k] = (
0.5 * n_features * (
digamma(self.dof_[k]) - np.log(self.scale_[k])))
self.precs_ = np.tile(self.dof_ / self.scale_, [n_features, 1]).T

elif self.covariance_type == 'diag':
for k in range(self.n_components):
self.dof_[k].fill(1. + 0.5 * np.sum(z.T[k], axis=0))
sq_diff = (X - self.means_[k]) ** 2  # see comment above
self.scale_[k] = np.ones(n_features) + 0.5 * np.dot(
z.T[k], (sq_diff + 1))
self.precs_[k] = self.dof_[k] / self.scale_[k]
self.bound_prec_[k] = 0.5 * np.sum(digamma(self.dof_[k])
- np.log(self.scale_[k]))
self.bound_prec_[k] -= 0.5 * np.sum(self.precs_[k])

elif self.covariance_type == 'tied':
self.dof_ = 2 + X.shape[0] + n_features
self.scale_ = (X.shape[0] + 1) * np.identity(n_features)
for k in range(self.n_components):
diff = X - self.means_[k]
self.scale_ += np.dot(diff.T, z[:, k:k + 1] * diff)
self.scale_ = pinvh(self.scale_)
self.precs_ = self.dof_ * self.scale_
self.det_scale_ = linalg.det(self.scale_)
self.bound_prec_ = 0.5 * wishart_log_det(
self.dof_, self.scale_, self.det_scale_, n_features)
self.bound_prec_ -= 0.5 * self.dof_ * np.trace(self.scale_)

elif self.covariance_type == 'full':
for k in range(self.n_components):
sum_resp = np.sum(z.T[k])
self.dof_[k] = 2 + sum_resp + n_features
self.scale_[k] = (sum_resp + 1) * np.identity(n_features)
diff = X - self.means_[k]
self.scale_[k] += np.dot(diff.T, z[:, k:k + 1] * diff)
self.scale_[k] = pinvh(self.scale_[k])
self.precs_[k] = self.dof_[k] * self.scale_[k]
self.det_scale_[k] = linalg.det(self.scale_[k])
self.bound_prec_[k] = 0.5 * wishart_log_det(
self.dof_[k], self.scale_[k], self.det_scale_[k],
n_features)
self.bound_prec_[k] -= 0.5 * self.dof_[k] * np.trace(
self.scale_[k])

def _monitor(self, X, z, n, end=False):
"""Monitor the lower bound during iteration

Debug method to help see exactly when it is failing to converge as
expected.

Note: this is very expensive and should not be used by default."""
if self.verbose > 0:
print("Bound after updating %8s: %f" % (n, self.lower_bound(X, z)))
if end:
print("Cluster proportions:", self.gamma_.T[1])
print("covariance_type:", self.covariance_type)

def _do_mstep(self, X, z, params):
"""Maximize the variational lower bound

Update each of the parameters to maximize the lower bound."""
self._monitor(X, z, "z")
self._update_concentration(z)
self._monitor(X, z, "gamma")
if 'm' in params:
self._update_means(X, z)
self._monitor(X, z, "mu")
if 'c' in params:
self._update_precisions(X, z)
self._monitor(X, z, "a and b", end=True)

def _initialize_gamma(self):
"Initializes the concentration parameters"
self.gamma_ = self.alpha * np.ones((self.n_components, 3))

def _bound_concentration(self):
"""The variational lower bound for the concentration parameter."""
logprior = gammaln(self.alpha) * self.n_components
logprior += np.sum((self.alpha - 1) * (
digamma(self.gamma_.T[2]) - digamma(self.gamma_.T[1] +
self.gamma_.T[2])))
logprior += np.sum(- gammaln(self.gamma_.T[1] + self.gamma_.T[2]))
logprior += np.sum(gammaln(self.gamma_.T[1]) +
gammaln(self.gamma_.T[2]))
logprior -= np.sum((self.gamma_.T[1] - 1) * (
digamma(self.gamma_.T[1]) - digamma(self.gamma_.T[1] +
self.gamma_.T[2])))
logprior -= np.sum((self.gamma_.T[2] - 1) * (
digamma(self.gamma_.T[2]) - digamma(self.gamma_.T[1] +
self.gamma_.T[2])))
return logprior

def _bound_means(self):
"The variational lower bound for the mean parameters"
logprior = 0.
logprior -= 0.5 * squared_norm(self.means_)
logprior -= 0.5 * self.means_.shape[1] * self.n_components
return logprior

def _bound_precisions(self):
"""Returns the bound term related to precisions"""
logprior = 0.
if self.covariance_type == 'spherical':
logprior += np.sum(gammaln(self.dof_))
logprior -= np.sum(
(self.dof_ - 1) * digamma(np.maximum(0.5, self.dof_)))
logprior += np.sum(- np.log(self.scale_) + self.dof_
- self.precs_[:, 0])
elif self.covariance_type == 'diag':
logprior += np.sum(gammaln(self.dof_))
logprior -= np.sum(
(self.dof_ - 1) * digamma(np.maximum(0.5, self.dof_)))
logprior += np.sum(- np.log(self.scale_) + self.dof_ - self.precs_)
elif self.covariance_type == 'tied':
logprior += _bound_wishart(self.dof_, self.scale_, self.det_scale_)
elif self.covariance_type == 'full':
for k in range(self.n_components):
logprior += _bound_wishart(self.dof_[k],
self.scale_[k],
self.det_scale_[k])
return logprior

def _bound_proportions(self, z):
"""Returns the bound term related to proportions"""
dg12 = digamma(self.gamma_.T[1] + self.gamma_.T[2])
dg1 = digamma(self.gamma_.T[1]) - dg12
dg2 = digamma(self.gamma_.T[2]) - dg12

cz = np.cumsum(z[:, ::-1], axis=-1)[:, -2::-1]
logprior = np.sum(cz * dg2[:-1]) + np.sum(z * dg1)
del cz  # Save memory
z_non_zeros = z[z > np.finfo(np.float32).eps]
logprior -= np.sum(z_non_zeros * np.log(z_non_zeros))
return logprior

def _logprior(self, z):
logprior = self._bound_concentration()
logprior += self._bound_means()
logprior += self._bound_precisions()
logprior += self._bound_proportions(z)
return logprior

def lower_bound(self, X, z):
"""returns a lower bound on model evidence based on X and membership"""
if self.covariance_type not in ['full', 'tied', 'diag', 'spherical']:
raise NotImplementedError("This ctype is not implemented: %s"
% self.covariance_type)
X = np.asarray(X)
if X.ndim == 1:
X = X[:, np.newaxis]
c = np.sum(z * _bound_state_log_lik(X, self._initial_bound +
self.bound_prec_, self.precs_,
self.means_, self.covariance_type))

return c + self._logprior(z)

def _set_weights(self):
for i in range(0, self.n_components):
self.weights_[i] = self.gamma_[i, 1] / (self.gamma_[i, 1]
+ self.gamma_[i, 2])
self.weights_ /= np.sum(self.weights_)

def _fit(self, X, y=None):
"""Estimate model parameters with the variational
algorithm.

For a full derivation and description of the algorithm see
doc/modules/dp-derivation.rst
or
http://scikit-learn.org/stable/modules/dp-derivation.html

A initialization step is performed before entering the em
algorithm. If you want to avoid this step, set the keyword
argument init_params to the empty string '' when when creating
the object. Likewise, if you would like just to do an
initialization, set n_iter=0.

Parameters
----------
X : array_like, shape (n, n_features)
List of n_features-dimensional data points.  Each row
corresponds to a single data point.

Returns
-------
responsibilities : array, shape (n_samples, n_components)
Posterior probabilities of each mixture component for each
observation.
"""
self.random_state_ = check_random_state(self.random_state)

# initialization step
if X.ndim == 1:
X = X[:, np.newaxis]

n_samples, n_features = X.shape
z = np.ones((n_samples, self.n_components))
z /= self.n_components

self._initial_bound = - 0.5 * n_features * np.log(2 * np.pi)
self._initial_bound -= np.log(2 * np.pi * np.e)

if (self.init_params != '') or not hasattr(self, 'gamma_'):
self._initialize_gamma()

if 'm' in self.init_params or not hasattr(self, 'means_'):
self.means_ = KMeans(
n_clusters=self.n_components,
random_state=self.random_state_).fit(X).cluster_centers_[::-1]

if 'w' in self.init_params or not hasattr(self, 'weights_'):
self.weights_ = np.tile(1.0 / self.n_components, self.n_components)

if 'c' in self.init_params or not hasattr(self, 'precs_'):
if self.covariance_type == 'spherical':
self.dof_ = np.ones(self.n_components)
self.scale_ = np.ones(self.n_components)
self.precs_ = np.ones((self.n_components, n_features))
self.bound_prec_ = 0.5 * n_features * (
digamma(self.dof_) - np.log(self.scale_))
elif self.covariance_type == 'diag':
self.dof_ = 1 + 0.5 * n_features
self.dof_ *= np.ones((self.n_components, n_features))
self.scale_ = np.ones((self.n_components, n_features))
self.precs_ = np.ones((self.n_components, n_features))
self.bound_prec_ = 0.5 * (np.sum(digamma(self.dof_) -
np.log(self.scale_), 1))
self.bound_prec_ -= 0.5 * np.sum(self.precs_, 1)
elif self.covariance_type == 'tied':
self.dof_ = 1.
self.scale_ = np.identity(n_features)
self.precs_ = np.identity(n_features)
self.det_scale_ = 1.
self.bound_prec_ = 0.5 * wishart_log_det(
self.dof_, self.scale_, self.det_scale_, n_features)
self.bound_prec_ -= 0.5 * self.dof_ * np.trace(self.scale_)
elif self.covariance_type == 'full':
self.dof_ = (1 + self.n_components + n_samples)
self.dof_ *= np.ones(self.n_components)
self.scale_ = [2 * np.identity(n_features)
for _ in range(self.n_components)]
self.precs_ = [np.identity(n_features)
for _ in range(self.n_components)]
self.det_scale_ = np.ones(self.n_components)
self.bound_prec_ = np.zeros(self.n_components)
for k in range(self.n_components):
self.bound_prec_[k] = wishart_log_det(
self.dof_[k], self.scale_[k], self.det_scale_[k],
n_features)
self.bound_prec_[k] -= (self.dof_[k] *
np.trace(self.scale_[k]))
self.bound_prec_ *= 0.5

# EM algorithms
current_log_likelihood = None
# reset self.converged_ to False
self.converged_ = False

for i in range(self.n_iter):
prev_log_likelihood = current_log_likelihood
# Expectation step
curr_logprob, z = self.score_samples(X)

current_log_likelihood = (
curr_logprob.mean() + self._logprior(z) / n_samples)

# Check for convergence.
if prev_log_likelihood is not None:
change = abs(current_log_likelihood - prev_log_likelihood)
if change < self.tol:
self.converged_ = True
break

# Maximization step
self._do_mstep(X, z, self.params)

if self.n_iter == 0:
# Need to make sure that there is a z value to output
# Output zeros because it was just a quick initialization
z = np.zeros((X.shape[0], self.n_components))

self._set_weights()

return z

class VBGMM(DPGMM):
"""Variational Inference for the Gaussian Mixture Model

Variational inference for a Gaussian mixture model probability
distribution. This class allows for easy and efficient inference
of an approximate posterior distribution over the parameters of a
Gaussian mixture model with a fixed number of components.

Initialization is with normally-distributed means and identity
covariance, for proper convergence.

Parameters
----------
n_components: int, default 1
Number of mixture components.

covariance_type: string, default 'diag'
String describing the type of covariance parameters to
use.  Must be one of 'spherical', 'tied', 'diag', 'full'.

alpha: float, default 1
Real number representing the concentration parameter of
the dirichlet distribution. Intuitively, the higher the
value of alpha the more likely the variational mixture of
Gaussians model will use all components it can.

tol : float, default 1e-3
Convergence threshold.

n_iter : int, default 10
Maximum number of iterations to perform before convergence.

params : string, default 'wmc'
Controls which parameters are updated in the training
process.  Can contain any combination of 'w' for weights,
'm' for means, and 'c' for covars.

init_params : string, default 'wmc'
Controls which parameters are updated in the initialization
process.  Can contain any combination of 'w' for weights,
'm' for means, and 'c' for covars.  Defaults to 'wmc'.

verbose : int, default 0
Controls output verbosity.

Attributes
----------
covariance_type : string
String describing the type of covariance parameters used by
the DP-GMM.  Must be one of 'spherical', 'tied', 'diag', 'full'.

n_features : int
Dimensionality of the Gaussians.

Number of mixture components.

weights_ : array, shape (n_components,)
Mixing weights for each mixture component.

means_ : array, shape (n_components, n_features)
Mean parameters for each mixture component.

precs_ : array
Precision (inverse covariance) parameters for each mixture
component.  The shape depends on covariance_type::

(n_components, 'n_features')                if 'spherical',
(n_features, n_features)                  if 'tied',
(n_components, n_features)                if 'diag',
(n_components, n_features, n_features)  if 'full'

converged_ : bool
True when convergence was reached in fit(), False
otherwise.

--------
GMM : Finite Gaussian mixture model fit with EM
DPGMM : Infinite Gaussian mixture model, using the dirichlet
process, fit with a variational algorithm
"""

def __init__(self, n_components=1, covariance_type='diag', alpha=1.0,
random_state=None, tol=1e-3, verbose=0,
min_covar=None, n_iter=10, params='wmc', init_params='wmc'):
super(VBGMM, self).__init__(
n_components, covariance_type, random_state=random_state,
tol=tol, verbose=verbose, min_covar=min_covar,
n_iter=n_iter, params=params, init_params=init_params)
self.alpha = float(alpha) / n_components

def score_samples(self, X):
"""Return the likelihood of the data under the model.

Compute the bound on log probability of X under the model
and return the posterior distribution (responsibilities) of
each mixture component for each element of X.

This is done by computing the parameters for the mean-field of
z for each observation.

Parameters
----------
X : array_like, shape (n_samples, n_features)
List of n_features-dimensional data points.  Each row
corresponds to a single data point.

Returns
-------
logprob : array_like, shape (n_samples,)
Log probabilities of each data point in X
responsibilities: array_like, shape (n_samples, n_components)
Posterior probabilities of each mixture component for each
observation
"""
if X.ndim == 1:
X = X[:, np.newaxis]
dg = digamma(self.gamma_) - digamma(np.sum(self.gamma_))

if self.covariance_type not in ['full', 'tied', 'diag', 'spherical']:
raise NotImplementedError("This ctype is not implemented: %s"
% self.covariance_type)
p = _bound_state_log_lik(X, self._initial_bound + self.bound_prec_,
self.precs_, self.means_,
self.covariance_type)

z = p + dg
z = log_normalize(z, axis=-1)
bound = np.sum(z * p, axis=-1)
return bound, z

def _update_concentration(self, z):
for i in range(self.n_components):
self.gamma_[i] = self.alpha + np.sum(z.T[i])

def _initialize_gamma(self):
self.gamma_ = self.alpha * np.ones(self.n_components)

def _bound_proportions(self, z):
logprior = 0.
dg = digamma(self.gamma_)
dg -= digamma(np.sum(self.gamma_))
logprior += np.sum(dg.reshape((-1, 1)) * z.T)
z_non_zeros = z[z > np.finfo(np.float32).eps]
logprior -= np.sum(z_non_zeros * np.log(z_non_zeros))
return logprior

def _bound_concentration(self):
logprior = 0.
logprior = gammaln(np.sum(self.gamma_)) - gammaln(self.n_components
* self.alpha)
logprior -= np.sum(gammaln(self.gamma_) - gammaln(self.alpha))
sg = digamma(np.sum(self.gamma_))
logprior += np.sum((self.gamma_ - self.alpha)
* (digamma(self.gamma_) - sg))
return logprior

def _monitor(self, X, z, n, end=False):
"""Monitor the lower bound during iteration

Debug method to help see exactly when it is failing to converge as
expected.

Note: this is very expensive and should not be used by default."""
if self.verbose > 0:
print("Bound after updating %8s: %f" % (n, self.lower_bound(X, z)))
if end:
print("Cluster proportions:", self.gamma_)
print("covariance_type:", self.covariance_type)

def _set_weights(self):
self.weights_[:] = self.gamma_
self.weights_ /= np.sum(self.weights_)


dpgmm_test1.py

import itertools

import numpy as np
from scipy import linalg
import matplotlib.pyplot as plt
import matplotlib as mpl

from gmm import GMM
from dpgmm import DPGMM

# Number of samples per component
n_samples = 500

# Generate random sample, two components
np.random.seed(0)
C = np.array([[0., -0.1], [1.7, .4]])
X = np.r_[np.dot(np.random.randn(n_samples, 2), C),
.7 * np.random.randn(n_samples, 2) + np.array([-6, 3])]

# Fit a mixture of Gaussians with EM using five components
gmm = GMM(n_components=5, covariance_type='full')
gmm.fit(X)

# Fit a Dirichlet process mixture of Gaussians using five components
dpgmm = DPGMM(n_components=5, covariance_type='full')
dpgmm.fit(X)

color_iter = itertools.cycle(['r', 'g', 'b', 'c', 'm'])

for i, (clf, title) in enumerate([(gmm, 'GMM'),
(dpgmm, 'Dirichlet Process GMM')]):
splot = plt.subplot(2, 1, 1 + i)
Y_ = clf.predict(X)
for i, (mean, covar, color) in enumerate(zip(
clf.means_, clf._get_covars(), color_iter)):
v, w = linalg.eigh(covar)
u = w[0] / linalg.norm(w[0])
# as the DP will not use every component it has access to
# unless it needs it, we shouldn't plot the redundant
# components.
if not np.any(Y_ == i):
continue
plt.scatter(X[Y_ == i, 0], X[Y_ == i, 1], .8, color=color)

# Plot an ellipse to show the Gaussian component
angle = np.arctan(u[1] / u[0])
angle = 180 * angle / np.pi  # convert to degrees
ell = mpl.patches.Ellipse(mean, v[0], v[1], 180 + angle, color=color)
ell.set_clip_box(splot.bbox)
ell.set_alpha(0.5)

plt.xlim(-10, 10)
plt.ylim(-3, 6)
plt.xticks(())
plt.yticks(())
plt.title(title)

plt.show()

dpgmm_test2.py

import itertools

import numpy as np
from scipy import linalg
import matplotlib.pyplot as plt
import matplotlib as mpl

from gmm import GMM
from dpgmm import DPGMM
#from sklearn.externals.six.moves import xrange

# Number of samples per component
n_samples = 100

# Generate random sample following a sine curve
np.random.seed(0)
X = np.zeros((n_samples, 2))
step = 4 * np.pi / n_samples

for i in range(0, X.shape[0]):
x = i * step - 6
X[i, 0] = x + np.random.normal(0, 0.1)
X[i, 1] = 3 * (np.sin(x) + np.random.normal(0, .2))

color_iter = itertools.cycle(['r', 'g', 'b', 'c', 'm'])

plt.figure(figsize=(8,6))
for i, (clf, title) in enumerate([
(GMM(n_components=10, covariance_type='full', n_iter=100),
"Expectation-maximization"),
(DPGMM(n_components=10, covariance_type='full', alpha=0.01,
n_iter=100),
"Dirichlet Process,alpha=0.01"),
(DPGMM(n_components=10, covariance_type='diag', alpha=100.,
n_iter=100),
"Dirichlet Process,alpha=100.")]):

clf.fit(X)
splot = plt.subplot(3, 1, 1 + i)
Y_ = clf.predict(X)
for i, (mean, covar, color) in enumerate(zip(
clf.means_, clf._get_covars(), color_iter)):
v, w = linalg.eigh(covar)
u = w[0] / linalg.norm(w[0])
# as the DP will not use every component it has access to
# unless it needs it, we shouldn't plot the redundant
# components.
if not np.any(Y_ == i):
continue
plt.scatter(X[Y_ == i, 0], X[Y_ == i, 1], .8, color=color)

# Plot an ellipse to show the Gaussian component
angle = np.arctan(u[1] / u[0])
angle = 180 * angle / np.pi  # convert to degrees
ell = mpl.patches.Ellipse(mean, v[0], v[1], 180 + angle, color=color)
ell.set_clip_box(splot.bbox)
ell.set_alpha(0.5)

plt.xlim(-6, 4 * np.pi - 6)
plt.ylim(-5, 5)
plt.title(title)
plt.xticks(())
plt.yticks(())

plt.show()

0 评论
0 收藏
0