# -*- coding: utf-8 -*-

import numpy as np
import scipy.stats as st
from abc import ABCMeta, abstractmethod
from .mvar.comp import ldl
from .mvarmodel import Mvar
from .aec.utils import filter_band, calc_ampenv, FQ_BANDS

import six
from six.moves import map
from six.moves import range
from six.moves import zip

# Spectrum functions:

[docs]def spectrum(acoef, vcoef, fs=1, resolution=100): """ Generating data point from matrix *A* with MVAR coefficients. Args: *acoef* : numpy.array array of shape (k, k, p) where *k* is number of channels and *p* is a model order. *vcoef* : numpy.array prediction error matrix (k, k) *fs* = 1 : int sampling rate *resolution* = 100 : int number of spectrum data points Returns: *A_z* : numpy.array z-transformed A(f) complex matrix in shape (*resolution*, k, k) *H_z* : numpy.array inversion of *A_z* *S_z* : numpy.array spectrum matrix (*resolution*, k, k) References: .. [1] K. J. Blinowska, R. Kus, M. Kaminski (2004) “Granger causality and information flow in multivariate processes” Physical Review E 70, 050902. """ p, k, k = acoef.shape freqs = np.linspace(0, fs*0.5, resolution) A_z = np.zeros((len(freqs), k, k), complex) H_z = np.zeros((len(freqs), k, k), complex) S_z = np.zeros((len(freqs), k, k), complex) I = np.eye(k, dtype=complex) for e, f in enumerate(freqs): epot = np.zeros((p, 1), complex) ce = np.exp(-2.j*np.pi*f*(1./fs)) epot[0] = ce for k in range(1, p): epot[k] = epot[k-1]*ce A_z[e] = I - np.sum([epot[x]*acoef[x] for x in range(p)], axis=0) H_z[e] = np.linalg.inv(A_z[e]) S_z[e] =[e], vcoef), H_z[e].T.conj()) return A_z, H_z, S_z
[docs]def spectrum_inst(acoef, vcoef, fs=1, resolution=100): """ Generating data point from matrix *A* with MVAR coefficients taking into account zero-lag effects. Args: *acoef* : numpy.array array of shape (k, k, p+1) where *k* is number of channels and *p* is a model order. acoef[0] - is (k, k) matrix for zero lag, acoef[1] for one data point lag and so on. *vcoef* : numpy.array prediction error matrix (k, k) *fs* = 1 : int sampling rate *resolution* = 100 : int number of spectrum data points Returns: *A_z* : numpy.array z-transformed A(f) complex matrix in shape (*resolution*, k, k) *H_z* : numpy.array inversion of *A_z* *S_z* : numpy.array spectrum matrix (*resolution*, k, k) References: .. [1] Erla S. et all, Multivariate Autoregressive Model with Instantaneous Effects to Improve Brain Connectivity Estimation, Int. J. Bioelectromagn. 11, 74–79 (2009). """ p, k, k = acoef.shape freqs = np.linspace(0, fs/2, resolution) B_z = np.zeros((len(freqs), k, k), complex) L, U, Lt = ldl(vcoef) Linv = np.linalg.inv(L) I = np.eye(k, dtype=complex) bcoef = np.array([, acoef[x]) for x in range(p)]) b0 = np.eye(k) - Linv for e, f in enumerate(freqs): epot = np.zeros((p, 1), complex) ce = np.exp(-2.j*np.pi*f*(1./fs)) epot[0] = ce for k in range(1, p): epot[k] = epot[k-1]*ce B_z[e] = I - b0 - np.sum([epot[x]*bcoef[x] for x in range(p)], axis=0) return B_z
######################################################################## # Connectivity classes: ########################################################################
[docs]class Connect(six.with_metaclass(ABCMeta, object)): """ Abstract class governing calculation of various connectivity estimators with concrete methods: *short_time*, *significance* and abstract *calculate*. """ def __init__(self): self.values_range = [None, None] # normalization bands self.two_sided = False # only positive, or also negative values
[docs] @abstractmethod def calculate(self): """Abstract method to calculate values of estimators from specific parameters""" pass
[docs] def short_time(self, data, nfft=None, no=None, **params): """ Short-tme version of estimator, where data is windowed into parts of length *nfft* and overlap *no*. *params* catch additional parameters specific for estimator. Args: *data* : numpy.array data matrix (kXN) or (kXNxR) where k - channels, N - data points, R - nr of trials *nfft* = None : int window length (if None it's N/5) *no* = None : int overlap length (if None it's N/10) *params* : additional parameters specific for chosen estimator Returns: *stvalues* : numpy.array short time values (time points, frequency, k, k), where k is number of channels """ assert nfft > no, "overlap must be smaller than window" if data.ndim > 2: k, N, trls = data.shape else: k, N = data.shape trls = 0 if not nfft: nfft = int(N/5) if not no: no = int(N/10) slices = range(0, N, int(nfft-no)) for e, i in enumerate(slices): if i+nfft >= N: if trls: datcut = np.concatenate((data[:, i:i+nfft], np.zeros((k, i+nfft-N, trls))), axis=1) else: datcut = np.concatenate((data[:, i:i+nfft], np.zeros((k, i+nfft-N))), axis=1) else: datcut = data[:, i:i+nfft] if e == 0: rescalc = self.calculate(datcut, **params) stvalues = np.zeros((len(slices), rescalc.shape[0], k, k)) stvalues[e] = rescalc continue stvalues[e] = self.calculate(datcut, **params) return stvalues
[docs] def short_time_significance(self, data, Nrep=10, alpha=0.05, nfft=None, no=None, verbose=True, **params): """ Significance of short-tme versions of estimators. It base on bootstrap :func:`Connect.bootstrap` for multitrial case and surrogate data :func:`Connect.surrogate` for one trial. Args: *data* : numpy.array data matrix (kXN) or (kXNxR) where k - channels, N - data points, R - nr of trials *Nrep* = 100 : int number of resamples *alpha* = 0.05 : float type I error rate (significance level) *nfft* = None : int window length (if None it's N/5) *no* = None : int overlap length (if None it's N/10) *verbose* = True : bool if True it prints dot on every realization, if False it's quiet. *params* : additional parameters specific for chosen estimator Returns: *signi_st* : numpy.array short time significance values in shape of - (tp, k, k) for one sided estimator - (tp, 2, k, k) for two sided where k is number of channels and tp number of time points """ assert nfft > no, "overlap must be smaller than window" if data.ndim > 2: k, N, trls = data.shape else: k, N = data.shape trls = 0 if not nfft: nfft = int(N/5) if not no: no = int(N/10) slices = range(0, N, int(nfft-no)) if self.two_sided: signi_st = np.zeros((len(slices), 2, k, k)) else: signi_st = np.zeros((len(slices), k, k)) for e, i in enumerate(slices): if i+nfft >= N: if trls: datcut = np.concatenate((data[:, i:i+nfft], np.zeros((k, i+nfft-N, trls))), axis=1) else: datcut = np.concatenate((data[:, i:i+nfft], np.zeros((k, i+nfft-N))), axis=1) else: datcut = data[:, i:i+nfft] signi_st[e] = self.significance(datcut, Nrep=Nrep, alpha=alpha, verbose=verbose, **params) return signi_st
[docs] def significance(self, data, Nrep=10, alpha=0.05, verbose=True, **params): """ Significance of connectivity estimators. It base on bootstrap :func:`Connect.bootstrap` for multitrial case and surrogate data :func:`Connect.surrogate` for one trial. Args: *data* : numpy.array data matrix (kXN) or (kXNxR) where k - channels, N - data points, R - nr of trials *Nrep* = 100 : int number of resamples *alpha* = 0.05 : float type I error rate (significance level) *verbose* = True : bool if True it prints dot on every realization, if False it's quiet. *params* : additional parameters specific for chosen estimator Returns: *signific* : numpy.array significance values, check :func:`Connect.levels` """ if data.ndim > 2: signific = self.bootstrap(data, Nrep=10, alpha=alpha, verbose=verbose, **params) else: signific = self.surrogate(data, Nrep=10, alpha=alpha, verbose=verbose, **params) return signific
[docs] def levels(self, signi, alpha, k): """ Levels of significance Args: *signi* : numpy.array bootstraped values of each channel *alpha* : float type I error rate (significance level) - from 0 to 1 - (1-*alpha*) for onesided estimators (e.g. class:`DTF`) - *alpha* and (1-*alpha*) for twosided (e.g. class:`PSI`) *k* : int number of channels Returns: *ficance* : numpy.array maximal value throughout frequency of score at percentile at level 1-*alpha* - (k, k) for one sided estimator - (2, k, k) for two sided """ if self.two_sided: ficance = np.zeros((2, k, k)) else: ficance = np.zeros((k, k)) for i in range(k): for j in range(k): if self.two_sided: ficance[0][i][j] = np.min(st.scoreatpercentile(signi[:, :, i, j], alpha*100, axis=1)) ficance[1][i][j] = np.max(st.scoreatpercentile(signi[:, :, i, j], (1-alpha)*100, axis=1)) else: ficance[i][j] = np.min(st.scoreatpercentile(signi[:, :, i, j], (1-alpha)*100, axis=1)) return ficance
def __calc_multitrial(self, data, **params): "Calc multitrial averaged estimator for :func:`Connect.bootstrap`" trials = data.shape[2] chosen = np.random.randint(trials, size=trials) bc = np.bincount(chosen) idxbc = np.nonzero(bc)[0] flag = True for num, occurence in zip(idxbc, bc[idxbc]): if occurence > 0: trdata = data[:, :, num] if flag: rescalc = self.calculate(trdata, **params)*occurence flag = False continue rescalc += self.calculate(trdata, **params)*occurence return rescalc/trials
[docs] def bootstrap(self, data, Nrep=100, alpha=0.05, verbose=True, **params): """ Bootstrap - random sampling with replacement of trials. Args: *data* : numpy.array multichannel data matrix *Nrep* = 100 : int number of resamples *alpha* = 0.05 : float type I error rate (significance level) *verbose* = True : bool if True it prints dot on every realization, if False it's quiet. *params* : additional parameters specific for chosen estimator Returns: *levelsigni* : numpy.array significance values, check :func:`Connect.levels` """ for i in range(Nrep): if verbose: print('.', end=' ') if i == 0: tmpsig = self.__calc_multitrial(data, **params) fres, k, k = tmpsig.shape signi = np.zeros((Nrep, fres, k, k)) signi[i] = tmpsig else: signi[i] = self.__calc_multitrial(data, **params) if verbose: print('|') return self.levels(signi, alpha, k)
[docs] def surrogate(self, data, Nrep=100, alpha=0.05, verbose=True, **params): """ Surrogate data testing. Mixing data points in each channel. Significance level in calculated over all *Nrep* surrogate sets. Args: *data* : numpy.array multichannel data matrix *Nrep* = 100 : int number of resamples *alpha* = 0.05 : float type I error rate (significance level) *verbose* = True : bool if True it prints dot on every realization, if False it's quiet. *params* : additional parameters specific for chosen estimator Returns: *levelsigni* : numpy.array significance values, check :func:`Connect.levels` """ k, N = data.shape shdata = data.copy() for i in range(Nrep): if verbose: print('.', end=' ') for ch in range(k): np.random.shuffle(shdata[ch,:]) if i == 0: rtmp = self.calculate(shdata, **params) reskeeper = np.zeros((Nrep, rtmp.shape[0], k, k)) reskeeper[i] = rtmp continue reskeeper[i] = self.calculate(shdata, **params) if verbose: print('|') return self.levels(reskeeper, alpha, k)
[docs]class ConnectAR(six.with_metaclass(ABCMeta, Connect)): """ Inherits from *Connect* class and governs calculation of various connectivity estimators basing on MVAR model methods. It overloads *short_time*, *significance* methods but *calculate* remains abstract. """ def __init__(self): super(ConnectAR, self).__init__() self.values_range = [0, 1]
[docs] def short_time(self, data, nfft=None, no=None, mvarmethod='yw', order=None, resol=None, fs=1): """ It overloads :class:`ConnectAR` method :func:`Connect.short_time`. Short-tme version of estimator, where data is windowed into parts of length *nfft* and overlap *no*. *params* catch additional parameters specific for estimator. Args: *data* : numpy.array data matrix (kXN) or (kXNxR) where k - channels, N - data points, R - nr of trials *nfft* = None : int window length (if None it's N/5) *no* = None : int overlap length (if None it's N/10) *mvarmethod* = 'yw' : MVAR parameters estimation method all avaiable methods you can find in *fitting_algorithms* *order* = None: MVAR model order; it None, it is set automatically basing on default criterion. *resol* = None: frequency resolution; if None, it is 100. *fs* = 1 : sampling frequency Returns: *stvalues* : numpy.array short time values (time points, frequency, k, k), where k is number of channels """ assert nfft > no, "overlap must be smaller than window" if data.ndim > 2: k, N, trls = data.shape else: k, N = data.shape trls = 0 if not nfft: nfft = int(N/5) if not no: no = int(N/10) slices = range(0, N, int(nfft-no)) for e, i in enumerate(slices): if i+nfft >= N: if trls: datcut = np.concatenate((data[:, i:i+nfft], np.zeros((k, i+nfft-N, trls))), axis=1) else: datcut = np.concatenate((data[:, i:i+nfft], np.zeros((k, i+nfft-N))), axis=1) else: datcut = data[:, i:i+nfft] ar, vr = Mvar().fit(datcut, order, mvarmethod) if e == 0: rescalc = self.calculate(ar, vr, fs, resol) stvalues = np.zeros((len(slices), rescalc.shape[0], k, k)) stvalues[e] = rescalc continue stvalues[e] = self.calculate(ar, vr, fs, resol) return stvalues
[docs] def short_time_significance(self, data, Nrep=100, alpha=0.05, method='yw', order=None, fs=1, resolution=None, nfft=None, no=None, verbose=True, **params): """ Significance of short-tme versions of estimators. It base on bootstrap :func:`ConnectAR.bootstrap` for multitrial case and surrogate data :func:`ConnectAR.surrogate` for one trial. Args: *data* : numpy.array data matrix (kXN) or (kXNxR) where k - channels, N - data points, R - nr of trials *Nrep* = 100 : int number of resamples *alpha* = 0.05 : float type I error rate (significance level) *method* = 'yw': str method of MVAR parameters estimation all avaiable methods you can find in *fitting_algorithms* *order* = None : int MVAR model order, if None, it's chosen using default criterion *fs* = 1 : int sampling frequency *resolution* = None : int resolution (if None, it's 100 points) *nfft* = None : int window length (if None it's N/5) *no* = None : int overlap length (if None it's N/10) *verbose* = True : bool if True it prints dot on every realization, if False it's quiet. *params* : additional parameters specific for chosen estimator Returns: *signi_st* : numpy.array short time significance values in shape of - (tp, k, k) for one sided estimator - (tp, 2, k, k) for two sided where k is number of channels and tp number of time points """ assert nfft > no, "overlap must be smaller than window" if data.ndim > 2: k, N, trls = data.shape else: k, N = data.shape trls = 0 if not nfft: nfft = int(N/5) if not no: no = int(N/10) slices = range(0, N, int(nfft-no)) signi_st = np.zeros((len(slices), k, k)) for e, i in enumerate(slices): if i+nfft >= N: if trls: datcut = np.concatenate((data[:, i:i+nfft], np.zeros((k, i+nfft-N, trls))), axis=1) else: datcut = np.concatenate((data[:, i:i+nfft], np.zeros((k, i+nfft-N))), axis=1) else: datcut = data[:, i:i+nfft] signi_st[e] = self.significance(datcut, method, order=order, resolution=resolution, Nrep=Nrep, alpha=alpha, verbose=verbose, **params) return signi_st
def __calc_multitrial(self, data, method='yw', order=None, fs=1, resolution=None, **params): "Calc multitrial averaged estimator for :func:`ConnectAR.bootstrap`" trials = data.shape[2] chosen = np.random.randint(trials, size=trials) ar, vr = Mvar().fit(data[:, :, chosen], order, method) rescalc = self.calculate(ar, vr, fs, resolution) return rescalc
[docs] def significance(self, data, method, order=None, resolution=None, Nrep=10, alpha=0.05, verbose=True, **params): """ Significance of connectivity estimators. It base on bootstrap :func:`ConnectAR.bootstrap` for multitrial case and surrogate data :func:`ConnectAR.surrogate` for one trial. Args: *data* : numpy.array data matrix *method* = 'yw': str method of MVAR parametersestimation all avaiable methods you can find in *fitting_algorithms* *order* = None : int MVAR model order, if None, it's chosen using default criterion *Nrep* = 100 : int number of resamples *alpha* = 0.05 : float type I error rate (significance level) *resolution* = None : int resolution (if None, it's 100 points) *verbose* = True : bool if True it prints dot on every realization, if False it's quiet. *params* : additional parameters specific for chosen estimator Returns: *signi_st* : numpy.array significance values, check :func:`Connect.levels` """ if data.ndim > 2: signific = self.bootstrap(data, method, order=order, resolution=resolution, Nrep=Nrep, alpha=alpha, verbose=verbose, **params) else: signific = self.surrogate(data, method, order=order, resolution=resolution, Nrep=Nrep, alpha=alpha, verbose=verbose, **params) return signific
[docs] def bootstrap(self, data, method, order=None, Nrep=10, alpha=0.05, fs=1, verbose=True, **params): """ Bootstrap - random sampling with replacement of trials for *ConnectAR*. Args: *data* : numpy.array multichannel data matrix *method* : str method of MVAR parametersestimation all avaiable methods you can find in *fitting_algorithms* *Nrep* = 100 : int number of resamples *alpha* = 0.05 : float type I error rate (significance level) *order* = None : int MVAR model order, if None, it's chosen using default criterion *verbose* = True : bool if True it prints dot on every realization, if False it's quiet. *params* : additional parameters specific for chosen estimator Returns: *levelsigni* : numpy.array significance values, check :func:`Connect.levels` """ resolution = 100 if 'resolution' in params and params['resolution']: resolution = params['resolution'] for i in range(Nrep): if verbose: print('.', end=' ') if i == 0: tmpsig = self.__calc_multitrial(data, method, order, fs, resolution) fres, k, k = tmpsig.shape signi = np.zeros((Nrep, fres, k, k)) signi[i] = tmpsig else: signi[i] = self.__calc_multitrial(data, method, order, fs, resolution) if verbose: print('|') return self.levels(signi, alpha, k)
[docs] def surrogate(self, data, method, Nrep=10, alpha=0.05, order=None, fs=1, verbose=True, **params): """ Surrogate data testing for *ConnectAR* . Mixing data points in each channel. Significance level in calculated over all *Nrep* surrogate sets. Args: *data* : numpy.array multichannel data matrix *method* : str method of MVAR parameters estimation all avaiable methods you can find in *fitting_algorithms* *Nrep* = 100 : int number of resamples *alpha* = 0.05 : float type I error rate (significance level) *order* = None : int MVAR model order, if None, it's chosen using default criterion *verbose* = True : bool if True it prints dot on every realization, if False it's quiet. *params* : additional parameters specific for chosen estimator Returns: *levelsigni* : numpy.array significance values, check :func:`Connect.levels` """ shdata = data.copy() k, N = data.shape resolution = 100 if 'resolution' in params and params['resolution']: resolution = params['resolution'] for i in range(Nrep): if verbose: print('.', end=' ') list(map(np.random.shuffle, shdata)) ar, vr = Mvar().fit(shdata, order, method) if i == 0: rtmp = self.calculate(ar, vr, fs, resolution) reskeeper = np.zeros((Nrep, rtmp.shape[0], k, k)) reskeeper[i] = rtmp continue reskeeper[i] = self.calculate(ar, vr, fs, resolution) if verbose: print('|') return self.levels(reskeeper, alpha, k)
############################ # MVAR based methods:
[docs]def dtf_fun(Acoef, Vcoef, fs, resolution, generalized=False): """ Directed Transfer Function estimation from MVAR parameters. Args: *Acoef* : numpy.array array of shape (k, k, p) where *k* is number of channels and *p* is a model order. *Vcoef* : numpy.array prediction error matrix (k, k) *fs* = 1 : int sampling rate *resolution* = 100 : int number of spectrum data points *generalized* = False : bool generalized version or not Returns: *DTF* : numpy.array matrix with estimation results (*resolution*, k, k) References: .. [1] M. Kaminski, K.J. Blinowska. A new method of the description of the information flow. Biol.Cybern. 65:203-210, (1991). """ A_z, H_z, S_z = spectrum(Acoef, Vcoef, fs, resolution=resolution) res, k, k = A_z.shape DTF = np.zeros((res, k, k)) if generalized: sigma = np.diag(Vcoef) else: sigma = np.ones(k) for i in range(res): mH = sigma*[i], H_z[i].T.conj()).real DTF[i] = (np.sqrt(sigma)*np.abs(H_z[i]))/np.sqrt(np.diag(mH)).reshape((k, 1)) return DTF
[docs]def pdc_fun(Acoef, Vcoef, fs, resolution, generalized=False): """ Partial Directed Coherence estimation from MVAR parameters. Args: *Acoef* : numpy.array array of shape (k, k, p) where *k* is number of channels and *p* is a model order. *Vcoef* : numpy.array prediction error matrix (k, k) *fs* = 1 : int sampling rate *resolution* = 100 : int number of spectrum data points *generalized* = False : bool generalized version or not Returns: *PDC* : numpy.array matrix with estimation results (*resolution*, k, k) References: .. [1] Sameshima, K., Baccala, L. A., Partial directed coherence: a new concept in neural structure determination., 2001, Biol. Cybern. 84, 463–474. """ A_z, H_z, S_z = spectrum(Acoef, Vcoef, fs, resolution=resolution) res, k, k = A_z.shape PDC = np.zeros((res, k, k)) sigma = np.diag(Vcoef) for i in range(res): mA = (1./sigma[:, None])*[i].T.conj(), A_z[i]).real PDC[i] = np.abs(A_z[i]/np.sqrt(sigma))/np.sqrt(np.diag(mA)) return PDC
[docs]class PartialCoh(ConnectAR): """ PartialCoh - class inherits from :class:`ConnectAR` and overloads :func:`Connect.calculate` method. """
[docs] def calculate(self, Acoef=None, Vcoef=None, fs=None, resolution=None): """ Partial Coherence estimation from MVAR parameters. Args: *Acoef* : numpy.array array of shape (k, k, p) where *k* is number of channels and *p* is a model order. *Vcoef* : numpy.array prediction error matrix (k, k) *fs* = 1 : int sampling rate *resolution* = 100 : int number of spectrum data points *generalized* = False : bool generalized version or not Returns: *PC* : numpy.array matrix with estimation results (*resolution*, k, k) References: .. [1] G. M. Jenkins, D. G. Watts. Spectral Analysis and its Applications. Holden-Day, USA, 1969 """ A_z, H_z, S_z = spectrum(Acoef, Vcoef, fs, resolution=resolution) res, k, k = A_z.shape PC = np.zeros((res, k, k)) before = np.ones((k, k)) before[0::2, :] *= -1 before[:, 0::2] *= -1 for i in range(res): D_z = np.linalg.inv(S_z[i]) dd = np.tile(np.diag(D_z), (k, 1)) mD = (dd*dd.T).real PC[i] = -1*before*(np.abs(D_z)/np.sqrt(mD)) return np.abs(PC)
[docs]class PDC(ConnectAR): """ PDC - class inherits from :class:`ConnectAR` and overloads :func:`Connect.calculate` method. """
[docs] def calculate(self, Acoef=None, Vcoef=None, fs=None, resolution=100): "More in :func:`pdc_fun`." return pdc_fun(Acoef, Vcoef, fs, resolution)
[docs]class gPDC(ConnectAR): """ gPDC - class inherits from :class:`ConnectAR` and overloads :func:`Connect.calculate` method. """
[docs] def calculate(self, Acoef=None, Vcoef=None, fs=None, resolution=100): "More in :func:`pdc_fun`" return pdc_fun(Acoef, Vcoef, fs, resolution, generalized=True)
[docs]class DTF(ConnectAR): """ DTF - class inherits from :class:`ConnectAR` and overloads :func:`Connect.calculate` method. """
[docs] def calculate(self, Acoef=None, Vcoef=None, fs=None, resolution=100): "More in :func:`dtf_fun`." return dtf_fun(Acoef, Vcoef, fs, resolution)
[docs]class gDTF(ConnectAR): """ gDTF - class inherits from :class:`ConnectAR` and overloads :func:`Connect.calculate` method. """
[docs] def calculate(self, Acoef=None, Vcoef=None, fs=None, resolution=100): "More in :func:`dtf_fun`." return dtf_fun(Acoef, Vcoef, fs, resolution, generalized=True)
[docs]class ffDTF(ConnectAR): """ ffDTF - class inherits from :class:`ConnectAR` and overloads :func:`Connect.calculate` method. """
[docs] def calculate(self, Acoef=None, Vcoef=None, fs=None, resolution=100): """ full-frequency Directed Transfer Function estimation from MVAR parameters. Args: *Acoef* : numpy.array array of shape (k, k, p) where *k* is number of channels and *p* is a model order. *Vcoef* : numpy.array prediction error matrix (k, k) *fs* = 1 : int sampling rate *resolution* = 100 : int number of spectrum data points *generalized* = False : bool generalized version or not Returns: *ffDTF* : numpy.array matrix with estimation results (*resolution*, k, k) References: .. [1] Korzeniewska, all. Determination of information flow direction among brain structures by a modified directed transfer function (dDTF) method. J. Neurosci. Methods 125, 195–207 (2003). """ A_z, H_z, S_z = spectrum(Acoef, Vcoef, fs, resolution=resolution) res, k, k = A_z.shape mH = np.zeros((res, k, k)) for i in range(res): mH[i] = np.abs([i], H_z[i].T.conj())) mHsum = np.sum(mH, axis=0) ffDTF = np.zeros((res, k, k)) for i in range(res): ffDTF[i] = (np.abs(H_z[i]).T/np.sqrt(np.diag(mHsum))).T return ffDTF
[docs]class dDTF(ConnectAR): """ dDTF - class inherits from :class:`ConnectAR` and overloads :func:`Connect.calculate` method. """
[docs] def calculate(self, Acoef=None, Vcoef=None, fs=None, resolution=100): """ direct Directed Transfer Function estimation from MVAR parameters. dDTF is a DTF multiplied in each frequency by Patrial Coherence. Args: *Acoef* : numpy.array array of shape (k, k, p) where *k* is number of channels and *p* is a model order. *Vcoef* : numpy.array prediction error matrix (k, k) *fs* = 1 : int sampling rate *resolution* = 100 : int number of spectrum data points *generalized* = False : bool generalized version or not Returns: *dDTF* : numpy.array matrix with estimation results (*resolution*, k, k) References: .. [1] Korzeniewska, all. Determination of information flow direction among brain structures by a modified directed transfer function (dDTF) method. J. Neurosci. Methods 125, 195–207 (2003). """ A_z, H_z, S_z = spectrum(Acoef, Vcoef, fs, resolution=resolution) res, k, k = A_z.shape mH = np.zeros((res, k, k)) for i in range(res): mH[i] = np.abs([i], H_z[i].T.conj())) mHsum = np.sum(mH, axis=0) dDTF = np.zeros((res, k, k)) before = np.ones((k, k)) before[0::2, :] *= -1 before[:, 0::2] *= -1 for i in range(res): D_z = np.linalg.inv(S_z[i]) dd = np.tile(np.diag(D_z), (k, 1)) mD = (dd*dd.T).real PC = np.abs(-1*before*(np.abs(D_z)/np.sqrt(mD))) dDTF[i] = PC*(np.abs(H_z[i]).T/np.sqrt(np.diag(mHsum))).T return dDTF
class iPDC(ConnectAR): """ iPDC - class inherits from :class:`ConnectAR` and overloads :func:`Connect.calculate` method. """ def calculate(self, Acoef=None, Vcoef=None, fs=None, resolution=100): """ instantaneous Partial Directed Coherence from MVAR parameters. Args: *Acoef* : numpy.array array of shape (k, k, p+1) where *k* is number of channels and *p* is a model order. It's zero lag case. *Vcoef* : numpy.array prediction error matrix (k, k) *fs* = 1 : int sampling rate *resolution* = 100 : int number of spectrum data points *generalized* = False : bool generalized version or not Returns: *iPDC* : numpy.array matrix with estimation results (*resolution*, k, k) References: .. [1] Erla, S. et all Multivariate Autoregressive Model with Instantaneous Effects to Improve Brain Connectivity Estimation. Int. J. Bioelectromagn. 11, 74–79 (2009). """ B_z = spectrum_inst(Acoef, Vcoef, fs, resolution=resolution) res, k, k = B_z.shape PDC = np.zeros((res, k, k)) for i in range(res): mB =[i].T.conj(), B_z[i]).real PDC[i] = np.abs(B_z[i])/np.sqrt(np.diag(mB)) return PDC class iDTF(ConnectAR): """ iDTF - class inherits from :class:`ConnectAR` and overloads :func:`Connect.calculate` method. """ def calculate(self, Acoef=None, Vcoef=None, fs=None, resolution=100): """ instantaneous Partial Directed Coherence from MVAR parameters. Args: *Acoef* : numpy.array array of shape (k, k, p+1) where *k* is number of channels and *p* is a model order. It's zero lag case. *Vcoef* : numpy.array prediction error matrix (k, k) *fs* = 1 : int sampling rate *resolution* = 100 : int number of spectrum data points *generalized* = False : bool generalized version or not Returns: *iPDC* : numpy.array matrix with estimation results (*resolution*, k, k) References: .. [1] Erla, S. et all, Multivariate Autoregressive Model with Instantaneous Effects to Improve Brain Connectivity Estimation. Int. J. Bioelectromagn. 11, 74–79 (2009). """ B_z = spectrum_inst(Acoef, Vcoef, fs, resolution=resolution) res, k, k = B_z.shape DTF = np.zeros((res, k, k)) for i in range(res): Hb_z = np.linalg.inv(B_z[i]) mH =, Hb_z.T.conj()).real DTF[i] = np.abs(Hb_z)/np.sqrt(np.diag(mH)).reshape((k, 1)) return DTF ############################ # Fourier Transform based methods:
[docs]class Coherency(Connect): """ Coherency - class inherits from :class:`Connect` and overloads :func:`Connect.calculate` method and *values_range* attribute. """ def __init__(self): self.values_range = [0, 1]
[docs] def calculate(self, data, cnfft=None, cno=None, window=np.hanning, im=False): """ Coherency calculation using FFT mehtod. Args: *data* : numpy.array array of shape (k, N) where *k* is number of channels and *N* is number of data points. *cnfft* = None : int number of data points in window; if None, it is N/5 *cno* = 0 : int overlap; if None, it is N/10 *window* = np.hanning : <function> generating window with 1 arg *n* window function *im* = False : bool if False it return absolute value, otherwise complex number Returns: *COH* : numpy.array matrix with estimation results (*resolution*, k, k) References: .. [1] M. B. Priestley Spectral Analysis and Time Series. Academic Press Inc. (London) LTD., 1981 """ assert cnfft > cno, "overlap must be smaller than window" k, N = data.shape if not cnfft: cnfft = int(N/5) if cno is None: cno = int(N/10) winarr = window(cnfft) slices = range(0, N, int(cnfft-cno)) ftsliced = np.zeros((len(slices), k, int(cnfft/2)+1), complex) for e, i in enumerate(slices): if i+cnfft >= N: datzer = np.concatenate((data[:, i:i+cnfft], np.zeros((k, i+cnfft-N))), axis=1) ftsliced[e] = np.fft.rfft(datzer*winarr, axis=1) else: ftsliced[e] = np.fft.rfft(data[:, i:i+cnfft]*winarr, axis=1) ctop = np.zeros((len(slices), k, k, int(cnfft/2)+1), complex) cdown = np.zeros((len(slices), k, int(cnfft/2)+1)) for i in range(len(slices)): c1 = ftsliced[i, :, :].reshape((k, 1, int(cnfft/2)+1)) c2 = ftsliced[i, :, :].conj().reshape((1, k, int(cnfft/2)+1)) ctop[i] = c1*c2 cdown[i] = np.abs(ftsliced[i, :, :])**2 cd1 = np.mean(cdown, axis=0).reshape((k, 1, int(cnfft/2)+1)) cd2 = np.mean(cdown, axis=0).reshape((1, k, int(cnfft/2)+1)) cdwn = cd1*cd2 coh = np.mean(ctop, axis=0)/np.sqrt(cdwn) if not im: coh = np.abs(coh) return coh.T
[docs]class PSI(Connect): """ PSI - class inherits from :class:`Connect` and overloads :func:`Connect.calculate` method. """ def __init__(self): super(PSI, self).__init__() self.two_sided = True
[docs] def calculate(self, data, band_width=4, psinfft=None, psino=0, window=np.hanning): """ Phase Slope Index calculation using FFT mehtod. Args: *data* : numpy.array array of shape (k, N) where *k* is number of channels and *N* is number of data points. *band_width* = 4 : int width of frequency band where PSI values are summed *psinfft* = None : int number of data points in window; if None, it is N/5 *psino* = 0 : int overlap; if None, it is N/10 *window* = np.hanning : <function> generating window with 1 arg *n* window function Returns: *COH* : numpy.array matrix with estimation results (*resolution*, k, k) References: .. [1] Nolte G. et all, Comparison of Granger Causality and Phase Slope Index. 267–276 (2009). """ k, N = data.shape if not psinfft: psinfft = int(N/4) assert psinfft > psino, "overlap must be smaller than window" coh = Coherency() cohval = coh.calculate(data, cnfft=psinfft, cno=psino, window=window, im=True) fq_bands = np.arange(0, int(psinfft/2)+1, band_width) psi = np.zeros((len(fq_bands)-1, k, k)) for f in range(len(fq_bands)-1): ctmp = cohval[fq_bands[f]:fq_bands[f+1], :, :] psi[f] = np.imag(np.sum(ctmp[:-1, :, :].conj()*ctmp[1:, :, :], axis=0)) return psi
[docs]class GCI(Connect): """ GCI - class inherits from :class:`Connect` and overloads :func:`Connect.calculate` method. """ def __init__(self): super(GCI, self).__init__() self.two_sided = False
[docs] def calculate(self, data, gcimethod='yw', gciorder=None): """ Granger Causality Index calculation from MVAR model. Args: *data* : numpy.array array of shape (k, N) where *k* is number of channels and *N* is number of data points. *gcimethod* = 'yw' : int MVAR parameters estimation model *gciorder* = None : int model order, if None appropiate value is chosen basic on default criterion Returns: *gci* : numpy.array matrix with estimation results (*resolution*, k, k) References: .. [1] Nolte G. et all, Comparison of Granger Causality and Phase Slope Index. 267–276 (2009). """ k, N = data.shape arfull, vrfull = Mvar().fit(data, gciorder, gcimethod) gcval = np.zeros((k, k)) for i in range(k): arix = [j for j in range(k) if i != j] ar_i, vr_i = Mvar().fit(data[arix, :], gciorder, gcimethod) for e, c in enumerate(arix): gcval[c, i] = np.log(vrfull[i, i]/vr_i[e, e]) return np.tile(gcval, (2, 1, 1))
############################ # Envelopes based methods: class AEC(Connect): """ AEC - class inherits from :class:`Connect` and overloads :func:`Connect.calculate` method. """ def __init__(self): super(AEC, self).__init__() self._freq_bands = FQ_BANDS self.two_sided = True def set_freq_bands(self, bands): "Sets freqnecy bands" problem = False if type(bands) == list: if np.any([len(b) != 2 for b in bands]): problem = True elif type(bands) == np.ndarray: if bands.shape[1] != 2: problem = True if problem: raise ValueError("*bands must be list of pairs or numpy array shaped (n, 2)") else: self._freq_bands = bands @property def freq_bands(self): return _freq_bands def calculate(self, data, fs, bands = None, filter = None): """ Amplitude Envelope Correlations calculation. Args: *data* : numpy.array array of shape (k, N) where *k* is number of channels and *N* is number of data points. *fs* : int sampling rate *bands* : numpy.array or list frequency bands. It must be list of pairs or numpy array shaped (n, 2). Default values are: 'theta': [6, 7], 'alpha': [8, 13], 'beta': [15, 25], 'low-gamma': [30, 45], 'high-gamma': [55, 70]} *filter* : tuple tuple (b, a) consisting of a numerator (b) and a denominator (a) polynomials of the IIR filter. If None, a Butterworth filter of order 4 is used. Returns: *aec* : numpy.array matrix with estimation results (len(bands), k, k) References: .. [1] Bruns, A. et al., Amplitude envelope correlation detects coupling among incoherent brain signals. NeuroReport. 1509-1514 (2000). """ if not bands is None: self.set_freq_bands(bands) k, N = data.shape aecval = np.zeros((len(self._freq_bands), k, k)) for e, band in enumerate(self._freq_bands): if type(band) == str: band = FQ_BANDS[band] filtdata = filter_band(data, fs, band, filter) envelopes = calc_ampenv(filtdata) aecval[e] = np.corrcoef(envelopes) return aecval conn_estim_dc = {'dtf': DTF, 'pdc': PDC, 'ipdc': iPDC, 'psi': PSI, 'ffdtf': ffDTF, 'ddtf': dDTF, 'gdtf': gDTF, 'gpdc': gPDC, 'pcoh': PartialCoh, 'coh': Coherency, 'gci': GCI, 'aec': AEC}