# -*- coding: utf-8 -*-
#! /usr/bin/env python
from __future__ import absolute_import
import numpy as np
from six.moves import range
[docs]def mvar_gen(Acf, npoints, omit=500):
"""
Generating data point from MVAR coefficiencs matrix *Acf*.
Args:
*Acf* : numpy.array
array in shape of (p,k,k) where *k* is number of channels and
*p* is a model order.
*npoints* : int
number of data points.
Returns:
*y* : numpy.array
(k, npoints) data points
"""
p, chn, chn = Acf.shape
y = np.zeros((chn, npoints+omit))
sigma = np.diag(np.ones(chn))
mu = np.zeros(chn)
for i in range(p, npoints+omit):
eps = np.random.multivariate_normal(mu, sigma)
for k in range(0, p):
yt = y[:, i-k-1].reshape((chn, 1))
y[:, i] += np.squeeze(np.dot(Acf[k], yt))
y[:, i] += eps
return y[:, omit:]
[docs]def mvar_gen_inst(Acf, npoints, omit=500):
"""
Generating data point from matrix *A* with MVAR coefficients but it
takes into account also zerolag interactions. So here Acf[0] means
instantenous interaction not as in *mvar_gen* one data point lagged.
Args:
*Acf* : numpy.array
array in shape of (p,k,k) where *k* is number of channels and
*p* is a model order.
*npoints* : int
number of data points.
Returns:
*y* : np.array
(k, n) data points
"""
p, chn, chn = Acf.shape
y = np.zeros((chn, npoints+omit))
sigma = np.diag(np.ones(chn))
mu = np.zeros(chn)
for i in range(p, npoints+omit):
eps = np.random.multivariate_normal(mu, sigma)
for k in range(0, p):
yt = y[:, i-k].reshape((chn, 1))
y[:, i] += np.squeeze(np.dot(Acf[k], yt))
y[:, i] += eps
return y[:, omit:]
[docs]def stability_mvar(Acf):
"""
Checks stability of MVAR given its parameters matrix *Acf*.
! Be careful when using ! not tested yet.
Implemented according to:
`https://sccn.ucsd.edu/wiki/Chapter_3.1._Stationarity_and_Stability`
Args:
*Acf* : numpy.array
array in shape of (p,k,k) where *k* is number of channels and
*p* is a model order.
Returns:
*stable* : bool
stability flag - when True it is stable.
"""
if len(Acf.shape)==2:
p = 1
chans = Acf.shape[0]
else:
p, chans, chans = Acf.shape
am = np.zeros((chans*p, chans*p))
ii = np.eye(chans*(p-1),chans*(p-1))
for e,a in enumerate(Acf):
am[0:chans, e*chans:(e+1)*chans] = a
am[chans: , :chans*(p-1)] = ii
eigval, eigvec = np.linalg.eig(am)
evl = np.log(np.abs(eigval))
stable = np.all(evl<0)
return stable
[docs]def meanncov(x, y=[], p=0, norm=True):
"""
Wrapper to multichannel case of new covariance *ncov*.
Args:
*x* : numpy.array
multidimensional data (channels, data points, trials).
*y* = [] : numpy.array
multidimensional data. If not given the autocovariance of *x*
will be calculated.
*p* = 0: int
window shift of input data. It can be negative as well.
*norm* = True: bool
normalization - if True the result is divided by length of *x*,
otherwise it is not.
Returns:
*mcov* : np.array
covariance matrix
"""
chn, N, trls = x.shape
for tr in range(trls):
if tr == 0:
if not len(y):
mcov = ncov(x[:, :, tr], p=p, norm=norm)
else:
mcov = ncov(x[:, :, tr], y[:, :, tr], p=p, norm=norm)
continue
if not len(y):
mcov += ncov(x[:, :, tr], p=p, norm=norm)
else:
mcov += ncov(x[:, :, tr], y[:, :, tr], p=p, norm=norm)
return mcov/trls
[docs]def ncov(x, y=[], p=0, norm=True):
"""
New covariance.
Args:
*x* : numpy.array
onedimensional data.
*y* = [] : numpy.array
onedimensional data. If not given the autocovariance of *x*
will be calculated.
*p* = 0: int
window shift of input data. It can be negative as well.
*norm* = True: bool
normalization - if True the result is divided by length of *x*,
otherwise it is not.
Returns:
*kv* : np.array
covariance matrix
"""
C, N = x.shape
cov = np.zeros((C, C, abs(p)+1))
if len(y) == 0:
y = x
if p >= 0:
for r in range(p+1):
cov[:, :, r] = np.dot(x[:, :N-r], y[:, r:].T)
else:
for r in range(abs(p)+1):
idxs = np.arange(-r, x.shape[1]-r)
zy = y.take(idxs, axis=1, mode='wrap')
cov[:, :, r] = np.dot(x[:, :N-r], zy[:, :N-r].T)
if norm:
kv = cov/(N-1)
else:
kv = cov
if p == 0:
kv = np.squeeze(kv)
return kv
[docs]def vieiramorf(y, pmax=1):
"""
Compute multichannel autoregresive model coefficients using
Vieira-Morf algorithm.
Args:
*y* : numpy.array
multichannel data in shape (k, n) for one trial case and
(k, n, tr) for multitrial
k - nr of channels, n -data points, tr - nr of trials
*pmax*: int >0
model order
Returns:
*Ar* : np.array
matrix with parameters matrix (p, k, k) where p - model order,
k - nr of channels
*Vr* : np.array
prediction error matrix (k,k)
References:
.. [1] Marple, Jr S. L., *Digital Spectral Analysis with
Applications*, Prentice-Hall Signal Processing Series, 1987
"""
assert pmax > 0, "pmax > 0"
if y.ndim > 2:
M, N, trls = y.shape
cov_func = meanncov
else:
M, N = y.shape
cov_func = ncov
f, b = y.copy(), y.copy()
pef = cov_func(y, norm=False)
peb = pef.copy()
arf = np.zeros((pmax, M, M))
arb = np.zeros((pmax, M, M))
for k in range(0, pmax):
D = cov_func(f[:, k+1:N], b[:, 0:N-k-1], norm=False)
arf[k, :, :] = np.dot(D, np.linalg.inv(peb))
arb[k, :, :] = np.dot(D.T, np.linalg.inv(pef))
tmp = f[:, k+1:] - np.dot(b[:, :N-k-1].T, arf[k, :, :].T).T
b[:, :N-k-1] = b[:, :N-k-1]-np.dot(f[:, k+1:].T, arb[k, :, :].T).T
f[:, k+1:] = tmp
for i in range(k):
tmpp = arf[i]-np.dot(arf[k], arb[k-i-1])
arb[k-i-1, :, :] = arb[k-i-1, :, :]-np.dot(arb[k, :, :], arf[i, :, :])
arf[i, :, :] = tmpp
peb = cov_func(b[:, :N-k-1], norm=False)
pef = cov_func(f[:, k+1:], norm=False)
return arf, pef/N
[docs]def nutallstrand(y, pmax=1):
"""
Compute multichannel autoregresive model coefficients using
Nutall-Strand algorithm.
Args:
*y* : numpy.array
multichannel data in shape (k, n) for one trial case and
(k, n, tr) for multitrial
k - nr of channels, n -data points, tr - nr of trials
*pmax*: int >0
model order
Returns:
*Ar* : np.array
matrix with parameters matrix (p, k, k) where p - model order,
k - nr of channels
*Vr* : np.array
prediction error matrix (k,k)
References:
.. [1] Marple, Jr S. L., *Digital Spectral Analysis with
Applications*, Prentice-Hall Signal Processing Series, 1987
"""
assert pmax > 0, "pmax > 0"
if y.ndim > 2:
M, N, trls = y.shape
cov_func = meanncov
else:
M, N = y.shape
cov_func = ncov
f, b = y.copy(), y.copy()
pef = cov_func(y, norm=False)
peb = pef.copy()
arf = np.zeros((pmax, M, M))
arb = np.zeros((pmax, M, M))
for k in range(0, pmax):
D = cov_func(f[:, k+1:N], b[:, 0:N-k-1], norm=False)
arf[k, :, :] = 2*np.dot(D, np.linalg.inv(peb + pef))
arb[k, :, :] = 2*np.dot(D.T, np.linalg.inv(pef + peb))
tmp = f[:, k+1:] - np.dot(b[:, :N-k-1].T, arf[k, :, :].T).T
b[:, :N-k-1] = b[:, :N-k-1]-np.dot(f[:, k+1:].T, arb[k, :, :].T).T
f[:, k+1:] = tmp
for i in range(k):
tmpp = arf[i]-np.dot(arf[k], arb[k-i-1])
arb[k-i-1, :, :] = arb[k-i-1, :, :]-np.dot(arb[k, :, :], arf[i, :, :])
arf[i, :, :] = tmpp
peb = cov_func(b[:, :N-k-1], norm=False)
pef = cov_func(f[:, k+1:], norm=False)
return arf, pef/N
[docs]def yulewalker(y, pmax=1):
"""
Compute multichannel autoregresive model coefficients using
Yule-Walker algorithm.
Args:
*y* : numpy.array
multichannel data in shape (k, n) for one trial case and
(k, n, tr) for multitrial
k - nr of channels, n -data points, tr - nr of trials
*pmax*: int >0
model order
Returns:
*Ar* : np.array
matrix with parameters matrix (p, k, k) where p - model order,
k - nr of channels
*Vr* : np.array
prediction error matrix (k,k)
References:
.. [1] Marple, Jr S. L., *Digital Spectral Analysis with
Applications*, Prentice-Hall Signal Processing Series, 1987
"""
assert pmax > 0, "pmax > 0"
if y.ndim > 2:
chn, n, trls = y.shape
rr_f = meanncov(y, p=pmax)
rr_b = meanncov(y, p=-pmax)
else:
chn, n = y.shape
rr_f = ncov(y, p=pmax)
rr_b = ncov(y, p=-1*pmax)
q = np.zeros((pmax*chn, pmax*chn))
acof = np.empty((pmax, chn, chn))
for p in range(pmax):
q[p*chn:(p+1)*chn, :] = np.hstack([rr_f[:, :, x].T if x >= 0 else rr_b[:, :, abs(x)].T for x in range(-1*p, pmax-p)])
req = np.vstack([rr_b[:, :, x].T for x in range(1, pmax+1)])
a_solved = np.linalg.solve(q, req)
var = np.copy(rr_f[:, :, 0])
for p in range(pmax):
acof[p] = a_solved[p*chn:(p+1)*chn, :].T
var -= np.dot(acof[p], rr_b[:, :, p+1].T)
return acof, var
fitting_algorithms = {'yw': yulewalker,
'ns': nutallstrand,
'vm': vieiramorf}