Source code for connectivipy.conn

# -*- coding: utf-8 -*-
#! /usr/bin/env python

from __future__ import absolute_import
from __future__ import print_function

import numpy as np
import scipy.stats as st
from abc import ABCMeta, abstractmethod
from .mvar.comp import ldl
from .mvarmodel import Mvar

import six
from six.moves import map
from six.moves import range
from six.moves import zip

# Spectrum functions:

[docs]def spectrum(acoef, vcoef, fs=1, resolution=100): """ Generating data point from matrix *A* with MVAR coefficients. Args: *acoef* : numpy.array array of shape (k, k, p) where *k* is number of channels and *p* is a model order. *vcoef* : numpy.array prediction error matrix (k, k) *fs* = 1 : int sampling rate *resolution* = 100 : int number of spectrum data points Returns: *A_z* : numpy.array z-transformed A(f) complex matrix in shape (*resolution*, k, k) *H_z* : numpy.array inversion of *A_z* *S_z* : numpy.array spectrum matrix (*resolution*, k, k) References: .. [1] K. J. Blinowska, R. Kus, M. Kaminski (2004) “Granger causality and information flow in multivariate processes” Physical Review E 70, 050902. """ p, k, k = acoef.shape freqs = np.linspace(0, fs*0.5, resolution) A_z = np.zeros((len(freqs), k, k), complex) H_z = np.zeros((len(freqs), k, k), complex) S_z = np.zeros((len(freqs), k, k), complex) I = np.eye(k, dtype=complex) for e, f in enumerate(freqs): epot = np.zeros((p, 1), complex) ce = np.exp(-2.j*np.pi*f*(1./fs)) epot[0] = ce for k in range(1, p): epot[k] = epot[k-1]*ce A_z[e] = I - np.sum([epot[x]*acoef[x] for x in range(p)], axis=0) H_z[e] = np.linalg.inv(A_z[e]) S_z[e] =[e], vcoef), H_z[e].T.conj()) return A_z, H_z, S_z
[docs]def spectrum_inst(acoef, vcoef, fs=1, resolution=100): """ Generating data point from matrix *A* with MVAR coefficients taking into account zero-lag effects. Args: *acoef* : numpy.array array of shape (k, k, p+1) where *k* is number of channels and *p* is a model order. acoef[0] - is (k, k) matrix for zero lag, acoef[1] for one data point lag and so on. *vcoef* : numpy.array prediction error matrix (k, k) *fs* = 1 : int sampling rate *resolution* = 100 : int number of spectrum data points Returns: *A_z* : numpy.array z-transformed A(f) complex matrix in shape (*resolution*, k, k) *H_z* : numpy.array inversion of *A_z* *S_z* : numpy.array spectrum matrix (*resolution*, k, k) References: .. [1] Erla S. et all, Multivariate Autoregressive Model with Instantaneous Effects to Improve Brain Connectivity Estimation, Int. J. Bioelectromagn. 11, 74–79 (2009). """ p, k, k = acoef.shape freqs = np.linspace(0, fs/2, resolution) B_z = np.zeros((len(freqs), k, k), complex) L, U, Lt = ldl(vcoef) Linv = np.linalg.inv(L) I = np.eye(k, dtype=complex) bcoef = np.array([, acoef[x]) for x in range(p)]) b0 = np.eye(k) - Linv for e, f in enumerate(freqs): epot = np.zeros((p, 1), complex) ce = np.exp(-2.j*np.pi*f*(1./fs)) epot[0] = ce for k in range(1, p): epot[k] = epot[k-1]*ce B_z[e] = I - b0 - np.sum([epot[x]*bcoef[x] for x in range(p)], axis=0) return B_z
######################################################################## # Connectivity classes: ########################################################################
[docs]class Connect(six.with_metaclass(ABCMeta, object)): """ Abstract class governing calculation of various connectivity estimators with concrete methods: *short_time*, *significance* and abstract *calculate*. """ def __init__(self): self.values_range = [None, None] # normalization bands self.two_sided = False # only positive, or also negative values
[docs] @abstractmethod def calculate(self): """Abstract method to calculate values of estimators from specific parameters""" pass
[docs] def short_time(self, data, nfft=None, no=None, **params): """ Short-tme version of estimator, where data is windowed into parts of length *nfft* and overlap *no*. *params* catch additional parameters specific for estimator. Args: *data* : numpy.array data matrix *nfft* = None : int window length (if None it's N/5) *no* = None : int overlap length (if None it's N/10) *params* : additional parameters specific for chosen estimator Returns: *stvalues* : numpy.array short time values (time points, frequency, k, k), where k is number of channels """ assert nfft > no, "overlap must be smaller than window" if data.ndim > 2: k, N, trls = data.shape else: k, N = data.shape trls = 0 if not nfft: nfft = int(N/5) if not no: no = int(N/10) slices = range(0, N, int(nfft-no)) for e, i in enumerate(slices): if i+nfft >= N: if trls: datcut = np.concatenate((data[:, i:i+nfft], np.zeros((k, i+nfft-N, trls))), axis=1) else: datcut = np.concatenate((data[:, i:i+nfft], np.zeros((k, i+nfft-N))), axis=1) else: datcut = data[:, i:i+nfft] if e == 0: rescalc = self.calculate(datcut, **params) stvalues = np.zeros((len(slices), rescalc.shape[0], k, k)) stvalues[e] = rescalc continue stvalues[e] = self.calculate(datcut, **params) return stvalues
[docs] def short_time_significance(self, data, Nrep=10, alpha=0.05, nfft=None, no=None, verbose=True, **params): """ Significance of short-tme versions of estimators. It base on bootstrap :func:`Connect.bootstrap` for multitrial case and surrogate data :func:`Connect.surrogate` for one trial. Args: *data* : numpy.array data matrix *Nrep* = 100 : int number of resamples *alpha* = 0.05 : float type I error rate (significance level) *nfft* = None : int window length (if None it's N/5) *no* = None : int overlap length (if None it's N/10) *verbose* = True : bool if True it prints dot on every realization, if False it's quiet. *params* : additional parameters specific for chosen estimator Returns: *signi_st* : numpy.array short time significance values in shape of - (tp, k, k) for one sided estimator - (tp, 2, k, k) for two sided where k is number of channels and tp number of time points """ assert nfft > no, "overlap must be smaller than window" if data.ndim > 2: k, N, trls = data.shape else: k, N = data.shape trls = 0 if not nfft: nfft = int(N/5) if not no: no = int(N/10) slices = range(0, N, int(nfft-no)) if self.two_sided: signi_st = np.zeros((len(slices), 2, k, k)) else: signi_st = np.zeros((len(slices), k, k)) for e, i in enumerate(slices): if i+nfft >= N: if trls: datcut = np.concatenate((data[:, i:i+nfft], np.zeros((k, i+nfft-N, trls))), axis=1) else: datcut = np.concatenate((data[:, i:i+nfft], np.zeros((k, i+nfft-N))), axis=1) else: datcut = data[:, i:i+nfft] signi_st[e] = self.significance(datcut, Nrep=Nrep, alpha=alpha, verbose=verbose, **params) return signi_st
[docs] def significance(self, data, Nrep=10, alpha=0.05, verbose=True, **params): """ Significance of connectivity estimators. It base on bootstrap :func:`Connect.bootstrap` for multitrial case and surrogate data :func:`Connect.surrogate` for one trial. Args: *data* : numpy.array data matrix *Nrep* = 100 : int number of resamples *alpha* = 0.05 : float type I error rate (significance level) *verbose* = True : bool if True it prints dot on every realization, if False it's quiet. *params* : additional parameters specific for chosen estimator Returns: *signific* : numpy.array significance values, check :func:`Connect.levels` """ if data.ndim > 2: signific = self.bootstrap(data, Nrep=10, alpha=alpha, verbose=verbose, **params) else: signific = self.surrogate(data, Nrep=10, alpha=alpha, verbose=verbose, **params) return signific
[docs] def levels(self, signi, alpha, k): """ Levels of significance Args: *signi* : numpy.array bootstraped values of each channel *alpha* : float type I error rate (significance level) - from 0 to 1 - (1-*alpha*) for onesided estimators (e.g. class:`DTF`) - *alpha* and (1-*alpha*) for twosided (e.g. class:`PSI`) *k* : int number of channels Returns: *ficance* : numpy.array maximal value throughout frequency of score at percentile at level 1-*alpha* - (k, k) for one sided estimator - (2, k, k) for two sided """ if self.two_sided: ficance = np.zeros((2, k, k)) else: ficance = np.zeros((k, k)) for i in range(k): for j in range(k): if self.two_sided: ficance[0][i][j] = np.max(st.scoreatpercentile(signi[:, :, i, j], alpha*100, axis=1)) ficance[1][i][j] = np.max(st.scoreatpercentile(signi[:, :, i, j], (1-alpha)*100, axis=1)) else: ficance[i][j] = np.max(st.scoreatpercentile(signi[:, :, i, j], (1-alpha)*100, axis=1)) return ficance
def __calc_multitrial(self, data, **params): "Calc multitrial averaged estimator for :func:`Connect.bootstrap`" trials = data.shape[2] chosen = np.random.randint(trials, size=trials) bc = np.bincount(chosen) idxbc = np.nonzero(bc)[0] flag = True for num, occurence in zip(idxbc, bc[idxbc]): if occurence > 0: trdata = data[:, :, num] if flag: rescalc = self.calculate(trdata, **params)*occurence flag = False continue rescalc += self.calculate(trdata, **params)*occurence return rescalc/trials
[docs] def bootstrap(self, data, Nrep=100, alpha=0.05, verbose=True, **params): """ Bootstrap - random sampling with replacement of trials. Args: *data* : numpy.array multichannel data matrix *Nrep* = 100 : int number of resamples *alpha* = 0.05 : float type I error rate (significance level) *verbose* = True : bool if True it prints dot on every realization, if False it's quiet. *params* : additional parameters specific for chosen estimator Returns: *levelsigni* : numpy.array significance values, check :func:`Connect.levels` """ for i in range(Nrep): if verbose: print('.', end=' ') if i == 0: tmpsig = self.__calc_multitrial(data, **params) fres, k, k = tmpsig.shape signi = np.zeros((Nrep, fres, k, k)) signi[i] = tmpsig else: signi[i] = self.__calc_multitrial(data, **params) if verbose: print('|') return self.levels(signi, alpha, k)
[docs] def surrogate(self, data, Nrep=100, alpha=0.05, verbose=True, **params): """ Surrogate data testing. Mixing data points in each channel. Significance level in calculated over all *Nrep* surrogate sets. Args: *data* : numpy.array multichannel data matrix *Nrep* = 100 : int number of resamples *alpha* = 0.05 : float type I error rate (significance level) *verbose* = True : bool if True it prints dot on every realization, if False it's quiet. *params* : additional parameters specific for chosen estimator Returns: *levelsigni* : numpy.array significance values, check :func:`Connect.levels` """ k, N = data.shape shdata = data.copy() for i in range(Nrep): if verbose: print('.', end=' ') list(map(np.random.shuffle, shdata)) if i == 0: rtmp = self.calculate(data, **params) reskeeper = np.zeros((Nrep, rtmp.shape[0], k, k)) reskeeper[i] = rtmp continue reskeeper[i] = self.calculate(data, **params) if verbose: print('|') return self.levels(reskeeper, alpha, k)
[docs]class ConnectAR(six.with_metaclass(ABCMeta, Connect)): """ Inherits from *Connect* class and governs calculation of various connectivity estimators basing on MVAR model methods. It overloads *short_time*, *significance* methods but *calculate* remains abstract. """ def __init__(self): super(ConnectAR, self).__init__() self.values_range = [0, 1]
[docs] def short_time(self, data, nfft=None, no=None, mvarmethod='yw', order=None, resol=None, fs=1): """ It overloads :class:`ConnectAR` method :func:`Connect.short_time`. Short-tme version of estimator, where data is windowed into parts of length *nfft* and overlap *no*. *params* catch additional parameters specific for estimator. Args: *data* : numpy.array data matrix *nfft* = None : int window length (if None it's N/5) *no* = None : int overlap length (if None it's N/10) *mvarmethod* = 'yw' : MVAR parameters estimation method all avaiable methods you can find in *fitting_algorithms* *order* = None: MVAR model order; it None, it is set automatically basing on default criterion. *resol* = None: frequency resolution; if None, it is 100. *fs* = 1 : sampling frequency Returns: *stvalues* : numpy.array short time values (time points, frequency, k, k), where k is number of channels """ assert nfft > no, "overlap must be smaller than window" if data.ndim > 2: k, N, trls = data.shape else: k, N = data.shape trls = 0 if not nfft: nfft = int(N/5) if not no: no = int(N/10) slices = range(0, N, int(nfft-no)) for e, i in enumerate(slices): if i+nfft >= N: if trls: datcut = np.concatenate((data[:, i:i+nfft], np.zeros((k, i+nfft-N, trls))), axis=1) else: datcut = np.concatenate((data[:, i:i+nfft], np.zeros((k, i+nfft-N))), axis=1) else: datcut = data[:, i:i+nfft] ar, vr = Mvar().fit(datcut, order, mvarmethod) if e == 0: rescalc = self.calculate(ar, vr, fs, resol) stvalues = np.zeros((len(slices), rescalc.shape[0], k, k)) stvalues[e] = rescalc continue stvalues[e] = self.calculate(ar, vr, fs, resol) return stvalues
[docs] def short_time_significance(self, data, Nrep=100, alpha=0.05, method='yw', order=None, fs=1, resolution=None, nfft=None, no=None, verbose=True, **params): """ Significance of short-tme versions of estimators. It base on bootstrap :func:`ConnectAR.bootstrap` for multitrial case and surrogate data :func:`ConnectAR.surrogate` for one trial. Args: *data* : numpy.array data matrix *Nrep* = 100 : int number of resamples *alpha* = 0.05 : float type I error rate (significance level) *method* = 'yw': str method of MVAR parameters estimation all avaiable methods you can find in *fitting_algorithms* *order* = None : int MVAR model order, if None, it's chosen using default criterion *fs* = 1 : int sampling frequency *resolution* = None : int resolution (if None, it's 100 points) *nfft* = None : int window length (if None it's N/5) *no* = None : int overlap length (if None it's N/10) *verbose* = True : bool if True it prints dot on every realization, if False it's quiet. *params* : additional parameters specific for chosen estimator Returns: *signi_st* : numpy.array short time significance values in shape of - (tp, k, k) for one sided estimator - (tp, 2, k, k) for two sided where k is number of channels and tp number of time points """ assert nfft > no, "overlap must be smaller than window" if data.ndim > 2: k, N, trls = data.shape else: k, N = data.shape trls = 0 if not nfft: nfft = int(N/5) if not no: no = int(N/10) slices = range(0, N, int(nfft-no)) signi_st = np.zeros((len(slices), k, k)) for e, i in enumerate(slices): if i+nfft >= N: if trls: datcut = np.concatenate((data[:, i:i+nfft], np.zeros((k, i+nfft-N, trls))), axis=1) else: datcut = np.concatenate((data[:, i:i+nfft], np.zeros((k, i+nfft-N))), axis=1) else: datcut = data[:, i:i+nfft] signi_st[e] = self.significance(datcut, method, order=order, resolution=resolution, Nrep=Nrep, alpha=alpha, verbose=verbose, **params) return signi_st
def __calc_multitrial(self, data, method='yw', order=None, fs=1, resolution=None, **params): "Calc multitrial averaged estimator for :func:`ConnectAR.bootstrap`" trials = data.shape[0] chosen = np.random.randint(trials, size=trials) ar, vr = Mvar().fit(data[:, :, chosen], order, method) rescalc = self.calculate(ar, vr, fs, resolution) return rescalc
[docs] def significance(self, data, method, order=None, resolution=None, Nrep=10, alpha=0.05, verbose=True, **params): """ Significance of connectivity estimators. It base on bootstrap :func:`ConnectAR.bootstrap` for multitrial case and surrogate data :func:`ConnectAR.surrogate` for one trial. Args: *data* : numpy.array data matrix *method* = 'yw': str method of MVAR parametersestimation all avaiable methods you can find in *fitting_algorithms* *order* = None : int MVAR model order, if None, it's chosen using default criterion *Nrep* = 100 : int number of resamples *alpha* = 0.05 : float type I error rate (significance level) *resolution* = None : int resolution (if None, it's 100 points) *verbose* = True : bool if True it prints dot on every realization, if False it's quiet. *params* : additional parameters specific for chosen estimator Returns: *signi_st* : numpy.array significance values, check :func:`Connect.levels` """ if data.ndim > 2: signific = self.bootstrap(data, method, order=order, resolution=resolution, Nrep=Nrep, alpha=alpha, verbose=verbose, **params) else: signific = self.surrogate(data, method, order=order, resolution=resolution, Nrep=Nrep, alpha=alpha, verbose=verbose, **params) return signific
[docs] def bootstrap(self, data, method, order=None, Nrep=10, alpha=0.05, fs=1, verbose=True, **params): """ Bootstrap - random sampling with replacement of trials for *ConnectAR*. Args: *data* : numpy.array multichannel data matrix *method* : str method of MVAR parametersestimation all avaiable methods you can find in *fitting_algorithms* *Nrep* = 100 : int number of resamples *alpha* = 0.05 : float type I error rate (significance level) *order* = None : int MVAR model order, if None, it's chosen using default criterion *verbose* = True : bool if True it prints dot on every realization, if False it's quiet. *params* : additional parameters specific for chosen estimator Returns: *levelsigni* : numpy.array significance values, check :func:`Connect.levels` """ resolution = 100 if 'resolution' in params and params['resolution']: resolution = params['resolution'] for i in range(Nrep): if verbose: print('.', end=' ') if i == 0: tmpsig = self.__calc_multitrial(data, method, order, fs, resolution) fres, k, k = tmpsig.shape signi = np.zeros((Nrep, fres, k, k)) signi[i] = tmpsig else: signi[i] = self.__calc_multitrial(data, method, order, fs, resolution) if verbose: print('|') return self.levels(signi, alpha, k)
[docs] def surrogate(self, data, method, Nrep=10, alpha=0.05, order=None, fs=1, verbose=True, **params): """ Surrogate data testing for *ConnectAR* . Mixing data points in each channel. Significance level in calculated over all *Nrep* surrogate sets. Args: *data* : numpy.array multichannel data matrix *method* : str method of MVAR parameters estimation all avaiable methods you can find in *fitting_algorithms* *Nrep* = 100 : int number of resamples *alpha* = 0.05 : float type I error rate (significance level) *order* = None : int MVAR model order, if None, it's chosen using default criterion *verbose* = True : bool if True it prints dot on every realization, if False it's quiet. *params* : additional parameters specific for chosen estimator Returns: *levelsigni* : numpy.array significance values, check :func:`Connect.levels` """ shdata = data.copy() k, N = data.shape resolution = 100 if 'resolution' in params and params['resolution']: resolution = params['resolution'] for i in range(Nrep): if verbose: print('.', end=' ') list(map(np.random.shuffle, shdata)) ar, vr = Mvar().fit(shdata, order, method) if i == 0: rtmp = self.calculate(ar, vr, fs, resolution) reskeeper = np.zeros((Nrep, rtmp.shape[0], k, k)) reskeeper[i] = rtmp continue reskeeper[i] = self.calculate(ar, vr, fs, resolution) if verbose: print('|') return self.levels(reskeeper, alpha, k)
############################ # MVAR based methods:
[docs]def dtf_fun(Acoef, Vcoef, fs, resolution, generalized=False): """ Directed Transfer Function estimation from MVAR parameters. Args: *Acoef* : numpy.array array of shape (k, k, p) where *k* is number of channels and *p* is a model order. *Vcoef* : numpy.array prediction error matrix (k, k) *fs* = 1 : int sampling rate *resolution* = 100 : int number of spectrum data points *generalized* = False : bool generalized version or not Returns: *DTF* : numpy.array matrix with estimation results (*resolution*, k, k) References: .. [1] M. Kaminski, K.J. Blinowska. A new method of the description of the information flow. Biol.Cybern. 65:203-210, (1991). """ A_z, H_z, S_z = spectrum(Acoef, Vcoef, fs, resolution=resolution) res, k, k = A_z.shape DTF = np.zeros((res, k, k)) if generalized: sigma = np.diag(Vcoef) else: sigma = np.ones(k) for i in range(res): mH = sigma*[i], H_z[i].T.conj()).real DTF[i] = (np.sqrt(sigma)*np.abs(H_z[i]))/np.sqrt(np.diag(mH)).reshape((k, 1)) return DTF
[docs]def pdc_fun(Acoef, Vcoef, fs, resolution, generalized=False): """ Partial Directed Coherence estimation from MVAR parameters. Args: *Acoef* : numpy.array array of shape (k, k, p) where *k* is number of channels and *p* is a model order. *Vcoef* : numpy.array prediction error matrix (k, k) *fs* = 1 : int sampling rate *resolution* = 100 : int number of spectrum data points *generalized* = False : bool generalized version or not Returns: *PDC* : numpy.array matrix with estimation results (*resolution*, k, k) References: .. [1] Sameshima, K., Baccala, L. A., Partial directed coherence: a new concept in neural structure determination., 2001, Biol. Cybern. 84, 463–474. """ A_z, H_z, S_z = spectrum(Acoef, Vcoef, fs, resolution=resolution) res, k, k = A_z.shape PDC = np.zeros((res, k, k)) sigma = np.diag(Vcoef) for i in range(res): mA = (1./sigma[:, None])*[i].T.conj(), A_z[i]).real PDC[i] = np.abs(A_z[i]/np.sqrt(sigma))/np.sqrt(np.diag(mA)) return PDC
[docs]class PartialCoh(ConnectAR): """ PartialCoh - class inherits from :class:`ConnectAR` and overloads :func:`Connect.calculate` method. """
[docs] def calculate(self, Acoef=None, Vcoef=None, fs=None, resolution=None): """ Partial Coherence estimation from MVAR parameters. Args: *Acoef* : numpy.array array of shape (k, k, p) where *k* is number of channels and *p* is a model order. *Vcoef* : numpy.array prediction error matrix (k, k) *fs* = 1 : int sampling rate *resolution* = 100 : int number of spectrum data points *generalized* = False : bool generalized version or not Returns: *PC* : numpy.array matrix with estimation results (*resolution*, k, k) References: .. [1] G. M. Jenkins, D. G. Watts. Spectral Analysis and its Applications. Holden-Day, USA, 1969 """ A_z, H_z, S_z = spectrum(Acoef, Vcoef, fs, resolution=resolution) res, k, k = A_z.shape PC = np.zeros((res, k, k)) before = np.ones((k, k)) before[0::2, :] *= -1 before[:, 0::2] *= -1 for i in range(res): D_z = np.linalg.inv(S_z[i]) dd = np.tile(np.diag(D_z), (k, 1)) mD = (dd*dd.T).real PC[i] = -1*before*(np.abs(D_z)/np.sqrt(mD)) return np.abs(PC)
[docs]class PDC(ConnectAR): """ PDC - class inherits from :class:`ConnectAR` and overloads :func:`Connect.calculate` method. """
[docs] def calculate(self, Acoef=None, Vcoef=None, fs=None, resolution=100): "More in :func:`pdc_fun`." return pdc_fun(Acoef, Vcoef, fs, resolution)
[docs]class gPDC(ConnectAR): """ gPDC - class inherits from :class:`ConnectAR` and overloads :func:`Connect.calculate` method. """
[docs] def calculate(self, Acoef=None, Vcoef=None, fs=None, resolution=100): "More in :func:`pdc_fun`" return pdc_fun(Acoef, Vcoef, fs, resolution, generalized=True)
[docs]class DTF(ConnectAR): """ DTF - class inherits from :class:`ConnectAR` and overloads :func:`Connect.calculate` method. """
[docs] def calculate(self, Acoef=None, Vcoef=None, fs=None, resolution=100): "More in :func:`dtf_fun`." return dtf_fun(Acoef, Vcoef, fs, resolution)
[docs]class gDTF(ConnectAR): """ gDTF - class inherits from :class:`ConnectAR` and overloads :func:`Connect.calculate` method. """
[docs] def calculate(self, Acoef=None, Vcoef=None, fs=None, resolution=100): "More in :func:`dtf_fun`." return dtf_fun(Acoef, Vcoef, fs, resolution, generalized=True)
[docs]class ffDTF(ConnectAR): """ ffDTF - class inherits from :class:`ConnectAR` and overloads :func:`Connect.calculate` method. """
[docs] def calculate(self, Acoef=None, Vcoef=None, fs=None, resolution=100): """ full-frequency Directed Transfer Function estimation from MVAR parameters. Args: *Acoef* : numpy.array array of shape (k, k, p) where *k* is number of channels and *p* is a model order. *Vcoef* : numpy.array prediction error matrix (k, k) *fs* = 1 : int sampling rate *resolution* = 100 : int number of spectrum data points *generalized* = False : bool generalized version or not Returns: *ffDTF* : numpy.array matrix with estimation results (*resolution*, k, k) References: .. [1] Korzeniewska, all. Determination of information flow direction among brain structures by a modified directed transfer function (dDTF) method. J. Neurosci. Methods 125, 195–207 (2003). """ A_z, H_z, S_z = spectrum(Acoef, Vcoef, fs, resolution=resolution) res, k, k = A_z.shape mH = np.zeros((res, k, k)) for i in range(res): mH[i] = np.abs([i], H_z[i].T.conj())) mHsum = np.sum(mH, axis=0) ffDTF = np.zeros((res, k, k)) for i in range(res): ffDTF[i] = (np.abs(H_z[i]).T/np.sqrt(np.diag(mHsum))).T return ffDTF
[docs]class dDTF(ConnectAR): """ dDTF - class inherits from :class:`ConnectAR` and overloads :func:`Connect.calculate` method. """
[docs] def calculate(self, Acoef=None, Vcoef=None, fs=None, resolution=100): """ direct Directed Transfer Function estimation from MVAR parameters. dDTF is a DTF multiplied in each frequency by Patrial Coherence. Args: *Acoef* : numpy.array array of shape (k, k, p) where *k* is number of channels and *p* is a model order. *Vcoef* : numpy.array prediction error matrix (k, k) *fs* = 1 : int sampling rate *resolution* = 100 : int number of spectrum data points *generalized* = False : bool generalized version or not Returns: *dDTF* : numpy.array matrix with estimation results (*resolution*, k, k) References: .. [1] Korzeniewska, all. Determination of information flow direction among brain structures by a modified directed transfer function (dDTF) method. J. Neurosci. Methods 125, 195–207 (2003). """ A_z, H_z, S_z = spectrum(Acoef, Vcoef, fs, resolution=resolution) res, k, k = A_z.shape mH = np.zeros((res, k, k)) for i in range(res): mH[i] = np.abs([i], H_z[i].T.conj())) mHsum = np.sum(mH, axis=0) dDTF = np.zeros((res, k, k)) before = np.ones((k, k)) before[0::2, :] *= -1 before[:, 0::2] *= -1 for i in range(res): D_z = np.linalg.inv(S_z[i]) dd = np.tile(np.diag(D_z), (k, 1)) mD = (dd*dd.T).real PC = np.abs(-1*before*(np.abs(D_z)/np.sqrt(mD))) dDTF[i] = PC*(np.abs(H_z[i]).T/np.sqrt(np.diag(mHsum))).T return dDTF
class iPDC(ConnectAR): """ iPDC - class inherits from :class:`ConnectAR` and overloads :func:`Connect.calculate` method. """ def calculate(self, Acoef=None, Vcoef=None, fs=None, resolution=100): """ instantaneous Partial Directed Coherence from MVAR parameters. Args: *Acoef* : numpy.array array of shape (k, k, p+1) where *k* is number of channels and *p* is a model order. It's zero lag case. *Vcoef* : numpy.array prediction error matrix (k, k) *fs* = 1 : int sampling rate *resolution* = 100 : int number of spectrum data points *generalized* = False : bool generalized version or not Returns: *iPDC* : numpy.array matrix with estimation results (*resolution*, k, k) References: .. [1] Erla, S. et all Multivariate Autoregressive Model with Instantaneous Effects to Improve Brain Connectivity Estimation. Int. J. Bioelectromagn. 11, 74–79 (2009). """ B_z = spectrum_inst(Acoef, Vcoef, fs, resolution=resolution) res, k, k = B_z.shape PDC = np.zeros((res, k, k)) for i in range(res): mB =[i].T.conj(), B_z[i]).real PDC[i] = np.abs(B_z[i])/np.sqrt(np.diag(mB)) return PDC class iDTF(ConnectAR): """ iDTF - class inherits from :class:`ConnectAR` and overloads :func:`Connect.calculate` method. """ def calculate(self, Acoef=None, Vcoef=None, fs=None, resolution=100): """ instantaneous Partial Directed Coherence from MVAR parameters. Args: *Acoef* : numpy.array array of shape (k, k, p+1) where *k* is number of channels and *p* is a model order. It's zero lag case. *Vcoef* : numpy.array prediction error matrix (k, k) *fs* = 1 : int sampling rate *resolution* = 100 : int number of spectrum data points *generalized* = False : bool generalized version or not Returns: *iPDC* : numpy.array matrix with estimation results (*resolution*, k, k) References: .. [1] Erla, S. et all, Multivariate Autoregressive Model with Instantaneous Effects to Improve Brain Connectivity Estimation. Int. J. Bioelectromagn. 11, 74–79 (2009). """ B_z = spectrum_inst(Acoef, Vcoef, fs, resolution=resolution) res, k, k = B_z.shape DTF = np.zeros((res, k, k)) for i in range(res): Hb_z = np.linalg.inv(B_z[i]) mH =, Hb_z.T.conj()).real DTF[i] = np.abs(Hb_z)/np.sqrt(np.diag(mH)).reshape((k, 1)) return DTF ############################ # Fourier Transform based methods:
[docs]class Coherency(Connect): """ Coherency - class inherits from :class:`Connect` and overloads :func:`Connect.calculate` method and *values_range* attribute. """ def __init__(self): self.values_range = [0, 1]
[docs] def calculate(self, data, cnfft=None, cno=None, window=np.hanning, im=False): """ Coherency calculation using FFT mehtod. Args: *data* : numpy.array array of shape (k, N) where *k* is number of channels and *N* is number of data points. *cnfft* = None : int number of data points in window; if None, it is N/5 *cno* = 0 : int overlap; if None, it is N/10 *window* = np.hanning : <function> generating window with 1 arg *n* window function *im* = False : bool if False it return absolute value, otherwise complex number Returns: *COH* : numpy.array matrix with estimation results (*resolution*, k, k) References: .. [1] M. B. Priestley Spectral Analysis and Time Series. Academic Press Inc. (London) LTD., 1981 """ assert cnfft>cno, "overlap must be smaller than window" k, N = data.shape if not cnfft: cnfft = int(N/5) if cno is None: cno = int(N/10) winarr = window(cnfft) slices = range(0, N, int(cnfft-cno)) ftsliced = np.zeros((len(slices), k, int(cnfft/2)+1), complex) for e, i in enumerate(slices): if i+cnfft >= N: datzer = np.concatenate((data[:, i:i+cnfft], np.zeros((k, i+cnfft-N))), axis=1) ftsliced[e] = np.fft.rfft(datzer*winarr, axis=1) else: ftsliced[e] = np.fft.rfft(data[:, i:i+cnfft]*winarr, axis=1) ctop = np.zeros((len(slices), k, k, int(cnfft/2)+1), complex) cdown = np.zeros((len(slices), k, int(cnfft/2)+1)) for i in range(len(slices)): c1 = ftsliced[i, :, :].reshape((k, 1, int(cnfft/2)+1)) c2 = ftsliced[i, :, :].conj().reshape((1, k, int(cnfft/2)+1)) ctop[i] = c1*c2 cdown[i] = np.abs(ftsliced[i, :, :])**2 cd1 = np.mean(cdown, axis=0).reshape((k, 1, int(cnfft/2)+1)) cd2 = np.mean(cdown, axis=0).reshape((1, k, int(cnfft/2)+1)) cdwn = cd1*cd2 coh = np.mean(ctop, axis=0)/np.sqrt(cdwn) if not im: coh = np.abs(coh) return coh.T
[docs]class PSI(Connect): """ PSI - class inherits from :class:`Connect` and overloads :func:`Connect.calculate` method. """ def __init__(self): super(PSI, self).__init__() self.two_sided = True
[docs] def calculate(self, data, band_width=4, psinfft=None, psino=0, window=np.hanning): """ Phase Slope Index calculation using FFT mehtod. Args: *data* : numpy.array array of shape (k, N) where *k* is number of channels and *N* is number of data points. *band_width* = 4 : int width of frequency band where PSI values are summed *psinfft* = None : int number of data points in window; if None, it is N/5 *psino* = 0 : int overlap; if None, it is N/10 *window* = np.hanning : <function> generating window with 1 arg *n* window function Returns: *COH* : numpy.array matrix with estimation results (*resolution*, k, k) References: .. [1] Nolte G. et all, Comparison of Granger Causality and Phase Slope Index. 267–276 (2009). """ k, N = data.shape if not psinfft: psinfft = int(N/4) assert psinfft > psino, "overlap must be smaller than window" coh = Coherency() cohval = coh.calculate(data, cnfft=psinfft, cno=psino, window=window, im=True) fq_bands = np.arange(0, int(psinfft/2)+1, band_width) psi = np.zeros((len(fq_bands)-1, k, k)) for f in range(len(fq_bands)-1): ctmp = cohval[fq_bands[f]:fq_bands[f+1], :, :] psi[f] = np.imag(np.sum(ctmp[:-1, :, :].conj()*ctmp[1:, :, :], axis=0)) return psi
[docs]class GCI(Connect): """ GCI - class inherits from :class:`Connect` and overloads :func:`Connect.calculate` method. """ def __init__(self): super(GCI, self).__init__() self.two_sided = True
[docs] def calculate(self, data, gcimethod='yw', gciorder=None): """ Granger Causality Index calculation from MVAR model. Args: *data* : numpy.array array of shape (k, N) where *k* is number of channels and *N* is number of data points. *gcimethod* = 'yw' : int MVAR parameters estimation model *gciorder* = None : int model order, if None appropiate value is chosen basic on default criterion Returns: *gci* : numpy.array matrix with estimation results (*resolution*, k, k) References: .. [1] Nolte G. et all, Comparison of Granger Causality and Phase Slope Index. 267–276 (2009). """ k, N = data.shape arfull, vrfull = Mvar().fit(data, gciorder, gcimethod) gcval = np.zeros((k, k)) for i in range(k): arix = [j for j in range(k) if i != j] ar_i, vr_i = Mvar().fit(data[arix, :], gciorder, gcimethod) for e, c in enumerate(arix): gcval[c, i] = np.log(vrfull[i, i]/vr_i[e, e]) return np.tile(gcval, (2, 1, 1))
conn_estim_dc = {'dtf': DTF, 'pdc': PDC, 'ipdc': iPDC, 'psi': PSI, 'ffdtf': ffDTF, 'ddtf': dDTF, 'gdtf': gDTF, 'gpdc': gPDC, 'pcoh': PartialCoh, 'coh': Coherency, 'gci': GCI, }