# -*- coding: utf-8 -*-
"""Helper function definitions."""
import numpy as np
import statsmodels.api as sm
from scipy.interpolate import interp1d
from scipy.spatial.distance import squareform
__all__ = ['upper',
'isPSD',
'nearestPSD',
'easy_multivariate_normal',
'kde_pvalue',
'create_heterogeneous_simulation'
]
[docs]def upper(mat):
'''Return upper triangle of matrix'''
idx = np.triu_indices_from(mat, k=1)
return mat[idx]
[docs]def isPSD(mat, tol=1e-8):
"""
Check if matrix is positive-semi-definite by virtue of all its eigenvalues being >= 0. The cholesky decomposition does not work for edge cases because np.linalg.cholesky fails on matrices with exactly 0 valued eigenvalues, whereas in Matlab this is not true, so that method appropriate. Ref: https://goo.gl/qKWWzJ
"""
# We dont assume matrix is Hermitian, i.e. real-valued and symmetric
# Could swap this out with np.linalg.eigvalsh(), which is faster but less general
e = np.linalg.eigvals(mat)
return np.all(e > -tol)
[docs]def nearestPSD(A, nit=100):
"""
Higham (2000) algorithm to find the nearest positive semi-definite matrix that minimizes the Frobenius distance/norm. Sstatsmodels using something very similar in corr_nearest(), but with spectral SGD to search for a local minima. Reference: https://goo.gl/Eut7UU
Args:
nit (int): number of iterations to run algorithm; more iterations improves accuracy but increases computation time.
"""
n = A.shape[0]
W = np.identity(n)
def _getAplus(A):
eigval, eigvec = np.linalg.eig(A)
Q = np.matrix(eigvec)
xdiag = np.matrix(np.diag(np.maximum(eigval, 0)))
return Q * xdiag * Q.T
def _getPs(A, W=None):
W05 = np.matrix(W**.5)
return W05.I * _getAplus(W05 * A * W05) * W05.I
def _getPu(A, W=None):
Aret = np.array(A.copy())
Aret[W > 0] = np.array(W)[W > 0]
return np.matrix(Aret)
# W is the matrix used for the norm (assumed to be Identity matrix here)
# the algorithm should work for any diagonal W
deltaS = 0
Yk = A.copy()
for k in range(nit):
Rk = Yk - deltaS
Xk = _getPs(Rk, W=W)
deltaS = Xk - Rk
Yk = _getPu(Xk, W=W)
# Double check returned matrix is PSD
if isPSD(Yk):
return Yk
else:
nearestPSD(Yk)
[docs]def easy_multivariate_normal(num_obs, num_features, corrs, mu=0.0, sigma=1.0, seed=None, forcePSD=True, return_new_corrs=False, nit=100):
"""
Function to more easily generate multivariate normal samples provided a correlation matrix or list of correlations (upper triangle of correlation matrix) instead of a covariance matrix. Defaults to returning approximately standard normal (mu = 0; sigma = 1) variates. Unlike numpy, if the desired correlation matrix is not positive-semi-definite, will by default issue a warning and find the nearest PSD correlation matrix and generate data with this matrix. This new matrix can optionally be returned used the return_new_corrs argument.
Args:
num_obs (int): number of observations/samples to generate (rows)
corrs (ndarray/list/float): num_features x num_features 2d array, flattend numpy array of length (num_features * (num_features-1)) / 2, or scalar for same correlation on all off-diagonals
num_features (int): number of features/variables/dimensions to generate (columns)
mu (float/list): mean of each feature across observations; default 0.0
sigma (float/list): sd of each feature across observations; default 1.0
forcePD (bool): whether to find and use a new correlation matrix if the requested one is not positive semi-definite; default False
return_new_corrs (bool): return the nearest correlation matrix that is positive semi-definite used to generate data; default False
nit (int): number of iterations to search for the nearest positive-semi-definite correlation matrix is the requested correlation matrix is not PSD; default 100
Returns:
ndarray: correlated data as num_obs x num_features array
"""
if seed is not None:
np.random.seed(seed)
if isinstance(mu, list):
assert len(
mu) == num_features, "Number of means must match number of features"
else:
mu = [mu] * num_features
if isinstance(sigma, list):
assert len(
sigma) == num_features, "Number of sds must match number of features"
else:
sigma = [sigma] * num_features
if isinstance(corrs, np.ndarray) and corrs.ndim == 2:
assert corrs.shape[0] == corrs.shape[1] and np.allclose(corrs, corrs.T) and np.allclose(
np.diagonal(corrs), np.ones_like(np.diagonal(corrs))), "Correlation matrix must be square symmetric"
elif (isinstance(corrs, np.ndarray) and corrs.ndim == 1) or isinstance(corrs, list):
assert len(corrs) == (num_features * (num_features - 1)) / \
2, "(num_features * (num_features - 1) / 2) correlation values are required for a flattened array or list"
corrs = squareform(corrs)
np.fill_diagonal(corrs, 1.0)
elif isinstance(corrs, float):
corrs = np.array(
[corrs] * int(((num_features * (num_features - 1)) / 2)))
corrs = squareform(corrs)
np.fill_diagonal(corrs, 1.0)
else:
raise ValueError(
"Correlations must be num_features x num_feature, flattend numpy array/list or scalar")
if not isPSD(corrs):
if forcePSD:
# Tell user their correlations are being recomputed if they didnt ask to save them as they might not realize
if not return_new_corrs:
print(
"Correlation matrix is not positive semi-definite. Solved for new correlation matrix.")
_corrs = np.array(nearestPSD(corrs, nit))
else:
raise ValueError("Correlation matrix is not positive semi-definite. Pymer4 will not generate inaccurate multivariate data. Use the forcePD argument to automatically solve for the closest desired correlation matrix.")
else:
_corrs = corrs
# Rescale correlation matrix by variances, given standard deviations of features
sd = np.diag(sigma)
# R * Vars = R * SD * SD
cov = _corrs.dot(sd.dot(sd))
X = np.random.multivariate_normal(mu, cov, size=num_obs)
if return_new_corrs:
return X, _corrs
else:
return X
[docs]def kde_pvalue(permutation_distribution, test_statistic, tails=2, kde_grid_size=200):
"""
Use a KDE to smooth a permutation distribution and use a interpolation to compute p-values a la:
https://users.aalto.fi/~eglerean/bramila_mantel.m
Args:
permutation_distribution (ndarry): array of permuted test statistics
test_statistic (float): true value of computed test statistic
tails (int): two-tailed or one-tailed p-value; default two-tailed
kde_grid_size (int): size of the kde grid to generate; default 200 if len(permutation_distribution) <= 5000 otherwise multiples of 200 correponding to how many extra permutations were performed in multiples of 5000
"""
if len(permutation_distribution) > 5000 and kde_grid_size == 200:
kde_grid_size = int(np.round(200 * len(permutation_distribution) / 5000))
kde = sm.nonparametric.KDEUnivariate(permutation_distribution)
# Compute robust bandwith of KDE kernel like matlab
# BandWidth = sig * (4/(3*N))^(1/5);
# Where sig = MAD / half-normal distribution median
bw = (np.median(np.abs(permutation_distribution - np.median(permutation_distribution))) / 0.6745) * (4 / (3 * len(permutation_distribution))) ** (1 / 5)
kde.fit(gridsize=kde_grid_size, fft=False, bw=bw)
# Get cumulative distribution function of kde fit
cdf = kde.cdf
# Learn a linear interpolation function between kde supports and the CDF to look up p-values with
# This is similar to tdist in scipy to look up p-values
pdist_func = interp1d(kde.support, cdf, fill_value='extrapolate')
# Look up p-value
left_p = pdist_func(test_statistic)
right_p = pdist_func(-1 * test_statistic)
# Deal with scipy extrapolating p-values out of range
if left_p < 0:
left_p = 0.000
elif left_p > 1:
left_p = 1.000
if right_p < 0:
right_p = 0.000
elif right_p > 1:
right_p = 1.000
if test_statistic > 0:
left_p = 1 - left_p
else:
right_p = 1 - right_p
if tails == 2:
out = left_p + right_p
elif tails == 1:
out = left_p
return out, pdist_func
[docs]def create_heterogeneous_simulation(r_within_1, r_within_2, r_between_1, r_between_2, n_variables):
"""Create a heterogeneous multivariate covariance matrix based on:
Omelka, M. and Hudecova, S. (2013) A comparison of the Mantel test
with a generalised distance covariance test. Environmetrics,
Vol. 24, 449–460. DOI: 10.1002/env.2238.
"""
z = np.zeros((n_variables*2, n_variables*2))
z[:int(n_variables/2), :int(n_variables/2)] = r_within_1
z[int(n_variables/2):n_variables, int(n_variables/2):n_variables] = r_within_2
z[n_variables:int(n_variables + (n_variables/2)), n_variables:int(n_variables + (n_variables/2))] = r_within_1
z[int(n_variables + (n_variables/2)):, int(n_variables + (n_variables/2)):] = r_within_2
z[n_variables:int(n_variables + (n_variables/2)), :int(n_variables/2)] = r_between_1
z[int(n_variables + (n_variables/2)):, int(n_variables/2):n_variables] = r_between_2
z[:int(n_variables/2), n_variables:int(n_variables+(n_variables/2))] = r_between_1
z[int(n_variables/2):n_variables, int(n_variables + (n_variables/2)):] = r_between_2
z[np.diag_indices(n_variables*2)] = 1
return z