Skip to content

Simulation

The simulate module provides tools for generating synthetic M/EEG data with known ground truth source activations. This is essential for validating and comparing inverse methods under controlled conditions.

Overview

Simulation in invertmeeg supports:

  • Configurable source patterns: Single dipoles, extended patches, or multiple simultaneous sources
  • Noise models: White noise, colored noise, and realistic sensor noise
  • Covariance structures: Correlated sources for testing robustness
  • Batch generation: Efficient generation of many samples for training or benchmarking

Quick Start

from invert.simulate import SimulationConfig, SimulationGenerator

# Create a simulation configuration
config = SimulationConfig(
    n_sources=2,
    snr=5.0,
    source_extent=10.0,  # mm
)

# Generate simulations
generator = SimulationGenerator(forward, config)
evoked, stc_true = generator.generate()

API Reference

SimulationConfig

Bases: BaseModel

Configuration for EEG simulation generator.

Attributes: batch_size: Number of simulations per batch batch_repetitions: Number of times to repeat each batch n_sources: Number of active sources (int or tuple for range) n_orders: Smoothness order(s) for spatial patterns amplitude_range: Min/max amplitude for source activity n_timepoints: Number of time samples per simulation snr_range: Signal-to-noise ratio range in dB n_timecourses: Number of pre-generated timecourses beta_range: Power-law exponent range for 1/f noise add_forward_error: Whether to add perturbations to leadfield forward_error: Magnitude of forward model error inter_source_correlation: Correlation between sources (float or range) diffusion_smoothing: Whether to use diffusion-based smoothing diffusion_parameter: Smoothing strength (alpha parameter) correlation_mode: Spatial noise correlation pattern noise_color_coeff: Spatial noise correlation strength random_seed: Random seed for reproducibility normalize_leadfield: Whether to normalize leadfield columns verbose: Verbosity level simulation_mode: Simulation mode ('patches' or 'mixture') background_beta: 1/f^beta exponent for smooth background background_mixture_alpha: Mixing coefficient alpha (higher = more background)

Source code in invert/simulate/config.py
class SimulationConfig(BaseModel):
    """Configuration for EEG simulation generator.

    Attributes:
        batch_size: Number of simulations per batch
        batch_repetitions: Number of times to repeat each batch
        n_sources: Number of active sources (int or tuple for range)
        n_orders: Smoothness order(s) for spatial patterns
        amplitude_range: Min/max amplitude for source activity
        n_timepoints: Number of time samples per simulation
        snr_range: Signal-to-noise ratio range in dB
        n_timecourses: Number of pre-generated timecourses
        beta_range: Power-law exponent range for 1/f noise
        add_forward_error: Whether to add perturbations to leadfield
        forward_error: Magnitude of forward model error
        inter_source_correlation: Correlation between sources (float or range)
        diffusion_smoothing: Whether to use diffusion-based smoothing
        diffusion_parameter: Smoothing strength (alpha parameter)
        correlation_mode: Spatial noise correlation pattern
        noise_color_coeff: Spatial noise correlation strength
        random_seed: Random seed for reproducibility
        normalize_leadfield: Whether to normalize leadfield columns
        verbose: Verbosity level
        simulation_mode: Simulation mode ('patches' or 'mixture')
        background_beta: 1/f^beta exponent for smooth background
        background_mixture_alpha: Mixing coefficient alpha (higher = more background)
    """

    batch_size: int = Field(default=1284, ge=1)
    batch_repetitions: int = Field(default=1, ge=1)
    n_sources: Union[int, tuple[int, int]] = Field(
        default=(1, 5), description="Single value or (min, max) tuple"
    )
    n_orders: Union[int, tuple[int, int]] = Field(
        default=(0, 3), description="Smoothness order or (min, max) tuple"
    )
    amplitude_range: tuple[float, float] = Field(
        default=(0.5, 1.0), description="Source amplitude range"
    )
    n_timepoints: int = Field(default=20, ge=1)
    snr_range: tuple[float, float] = Field(
        default=(-5.0, 5.0), description="SNR range in dB"
    )
    n_timecourses: int = Field(default=5000, ge=1)
    beta_range: tuple[float, float] = Field(default=(0.0, 3.0))
    add_forward_error: bool = Field(default=False)
    forward_error: float = Field(default=0.1, ge=0.0)
    inter_source_correlation: Union[float, tuple[float, float]] = Field(
        default=(0.25, 0.75)
    )
    diffusion_smoothing: bool = Field(default=True)
    diffusion_parameter: float = Field(default=0.1, ge=0.0)
    correlation_mode: Optional[
        Union[Literal["auto", "cholesky", "banded", "diagonal"], None]
    ] = Field(default=None)
    noise_color_coeff: Union[float, tuple[float, float]] = Field(default=(0.25, 0.75))
    random_seed: Optional[int] = Field(default=None)
    normalize_leadfield: bool = Field(default=False)
    verbose: int = Field(default=0, ge=0)

    # Mixture mode parameters (simple background + patches)
    simulation_mode: Literal["patches", "mixture"] = Field(
        default="patches",
        description="Simulation mode: 'patches' (sparse only) or 'mixture' (smooth background + patches)",
    )
    background_beta: Union[float, tuple[float, float]] = Field(
        default=(0.5, 2.0),
        description="1/f^beta exponent for smooth background temporal dynamics",
    )
    background_mixture_alpha: Union[float, tuple[float, float]] = Field(
        default=(0.7, 0.9),
        description="Mixing coefficient alpha: y = alpha*y_background + (1-alpha)*y_patches",
    )

    model_config = ConfigDict(frozen=False)

SimulationGenerator

Class-based EEG simulation generator with precomputed components.

This generator creates realistic EEG simulations by: 1. Generating spatially smooth source patterns 2. Assigning colored noise timecourses to sources 3. Projecting through the leadfield matrix 4. Adding spatially/temporally colored sensor noise

The class precomputes spatial smoothing operators and timecourses during initialization for faster batch generation.

Source code in invert/simulate/simulate.py
class SimulationGenerator:
    """Class-based EEG simulation generator with precomputed components.

    This generator creates realistic EEG simulations by:
    1. Generating spatially smooth source patterns
    2. Assigning colored noise timecourses to sources
    3. Projecting through the leadfield matrix
    4. Adding spatially/temporally colored sensor noise

    The class precomputes spatial smoothing operators and timecourses
    during initialization for faster batch generation.
    """

    def __init__(self, fwd, config: SimulationConfig | None = None, **kwargs):
        """Initialize the simulation generator.

        Parameters:
            fwd: MNE forward solution object
            config: SimulationConfig instance (optional)
            **kwargs: Configuration parameters (used if config is None)
        """
        # Initialize configuration
        if config is None:
            self.config = SimulationConfig(**kwargs)
        else:
            self.config = config

        # Store forward solution components
        self.fwd = fwd
        self.channel_types = np.array(fwd["info"].get_channel_types())
        self.leadfield_original = deepcopy(fwd["sol"]["data"])
        self.leadfield = deepcopy(self.leadfield_original)
        self.n_chans, self.n_dipoles = self.leadfield.shape

        # Initialize random number generator
        self.rng = np.random.default_rng(self.config.random_seed)

        # Parse n_sources parameter
        if isinstance(self.config.n_sources, (int, float)):
            n_sources_val = np.clip(self.config.n_sources, a_min=1, a_max=np.inf)
            self.min_sources, self.max_sources = int(n_sources_val), int(n_sources_val)
        else:
            self.min_sources, self.max_sources = self.config.n_sources

        # Precompute spatial smoothing operator
        self._precompute_spatial_operators()

        # Precompute timecourses
        self._precompute_timecourses()

        # Setup correlation sampling functions
        self._setup_correlation_samplers()

        # Normalize leadfield if requested
        if self.config.normalize_leadfield:
            self.leadfield /= np.linalg.norm(self.leadfield, axis=0)

    def _precompute_spatial_operators(self):
        """Precompute graph Laplacian and smoothing operators."""
        self.adjacency = build_adjacency(self.fwd, verbose=self.config.verbose)

        # Parse n_orders parameter
        if isinstance(self.config.n_orders, (tuple, list)):
            self.min_order, self.max_order = self.config.n_orders
            if self.min_order == self.max_order:
                self.max_order += 1
        else:
            self.min_order = 0
            self.max_order = self.config.n_orders

        self.sources, self.sources_dense, self.gradient = build_spatial_basis(
            self.adjacency,
            self.n_dipoles,
            self.min_order,
            self.max_order,
            diffusion_smoothing=self.config.diffusion_smoothing,
            diffusion_parameter=self.config.diffusion_parameter,
        )
        self.n_candidates = self.sources.shape[0]

    def _precompute_timecourses(self):
        """Precompute colored noise timecourses."""
        betas = self.rng.uniform(*self.config.beta_range, self.config.n_timecourses)

        time_courses = powerlaw_noise(
            betas,
            self.config.n_timepoints,
            n_signals=self.config.n_timecourses,
            rng=self.rng,
        )

        # Normalize to max(abs()) == 1
        self.time_courses = (time_courses.T / abs(time_courses).max(axis=1)).T

    def _setup_correlation_samplers(self):
        """Setup sampling functions for correlation parameters."""
        isc = self.config.inter_source_correlation
        if isinstance(isc, (tuple, list)):
            self.get_inter_source_correlation = lambda n=1: self.rng.uniform(
                isc[0], isc[1], n
            )
        else:
            self.get_inter_source_correlation = lambda n=1: np.full(n, isc)

        ncc = self.config.noise_color_coeff
        if isinstance(ncc, (tuple, list)):
            self.get_noise_color_coeff = lambda n=1: self.rng.uniform(ncc[0], ncc[1], n)
        else:
            self.get_noise_color_coeff = lambda n=1: np.full(n, ncc)

    def _generate_smooth_background(self, batch_size):
        """Generate smooth background activity with 1/f^beta temporal dynamics.

        Uses vectorized FFT-based colored noise generation instead of
        per-dipole Python loops.

        Parameters:
            batch_size: Number of simulations to generate

        Returns:
            y_background: [batch_size, n_dipoles, n_timepoints] background activity
        """
        # Sample beta parameters for background
        if isinstance(self.config.background_beta, tuple):
            betas = self.rng.uniform(*self.config.background_beta, batch_size)
        else:
            betas = np.full(batch_size, self.config.background_beta)

        y_background_all = np.empty(
            (batch_size, self.n_dipoles, self.config.n_timepoints)
        )

        for b_idx, beta in enumerate(betas):
            # Vectorized: generate all dipole timecourses at once
            background_timecourses = powerlaw_noise(
                beta, self.config.n_timepoints, n_signals=self.n_dipoles, rng=self.rng
            )  # [n_dipoles, n_timepoints]

            # Apply spatial smoothing using gradient operator
            background_smooth = self.gradient @ background_timecourses

            # Normalize
            max_val = np.max(np.abs(background_smooth))
            if max_val > 0:
                background_smooth = background_smooth / max_val
            y_background_all[b_idx] = background_smooth

        return y_background_all

    def _setup_leadfield(self):
        """Get the leadfield matrix, optionally with forward model error."""
        if self.config.add_forward_error:
            return add_error(
                self.leadfield_original,
                self.config.forward_error,
                self.gradient,
                self.rng,
            )
        return self.leadfield

    def _generate_patches(self, batch_size):
        """Generate patch-based source activity.

        Returns:
            y_patches: [batch_size, n_dipoles, n_timepoints] patch activity
            selection: list of source index arrays
            amplitude_values: list of amplitude arrays
            inter_source_correlations: array of correlation values
            noise_color_coeffs: array of noise color coefficients
        """
        n_sources_batch = self.rng.integers(
            self.min_sources, self.max_sources + 1, batch_size
        )

        # Select source locations
        selection = [
            self.rng.integers(0, self.n_candidates, n) for n in n_sources_batch
        ]

        # Sample amplitudes and timecourses
        amplitude_values = [
            self.rng.uniform(*self.config.amplitude_range, n) for n in n_sources_batch
        ]
        timecourse_choices = [
            self.rng.choice(self.config.n_timecourses, n) for n in n_sources_batch
        ]
        amplitudes = [self.time_courses[choice].T for choice in timecourse_choices]

        # Apply inter-source correlations
        inter_source_correlations = self.get_inter_source_correlation(n=batch_size)
        noise_color_coeffs = self.get_noise_color_coeff(n=batch_size)

        source_covariances = [
            get_cov(n, isc)
            for n, isc in zip(n_sources_batch, inter_source_correlations)
        ]
        amplitudes = [
            amp @ np.diag(amplitude_values[i]) @ cov
            for i, (amp, cov) in enumerate(zip(amplitudes, source_covariances))
        ]

        # Generate patch activity using dense source matrix for fast indexing
        y_patches = np.stack(
            [
                (amplitudes[i] @ self.sources_dense[selection[i]]).T
                / len(amplitudes[i])
                for i in range(batch_size)
            ],
            axis=0,
        )

        return (
            y_patches,
            n_sources_batch,
            selection,
            amplitude_values,
            inter_source_correlations,
            noise_color_coeffs,
        )

    def _generate_background(self, batch_size, y_patches):
        """Mix background activity with patches if in mixture mode.

        Returns:
            y: [batch_size, n_dipoles, n_timepoints] combined activity
            alphas: mixing coefficients or None
        """
        if self.config.simulation_mode == "mixture":
            y_background = self._generate_smooth_background(batch_size)

            if isinstance(self.config.background_mixture_alpha, tuple):
                alphas = self.rng.uniform(
                    *self.config.background_mixture_alpha, batch_size
                )
            else:
                alphas = np.full(batch_size, self.config.background_mixture_alpha)

            alphas_bc = alphas[:, np.newaxis, np.newaxis]
            y = alphas_bc * y_background + (1 - alphas_bc) * y_patches
        else:
            y = y_patches
            alphas = None

        return y, alphas

    def _apply_noise(self, x, batch_size, noise_color_coeffs, modes_batch):
        """Apply sensor noise to EEG data.

        Returns:
            x: [batch_size, n_channels, n_timepoints] noisy EEG data
            snr_levels: array of SNR values used
        """
        snr_levels = self.rng.uniform(
            low=self.config.snr_range[0],
            high=self.config.snr_range[1],
            size=batch_size,
        )

        x = np.stack(
            [
                add_white_noise(
                    xx,
                    snr_level,
                    self.rng,
                    self.channel_types,
                    correlation_mode=corr_mode,
                    noise_color_coeff=noise_color_level,
                )
                for (xx, snr_level, corr_mode, noise_color_level) in zip(
                    x, snr_levels, modes_batch, noise_color_coeffs
                )
            ],
            axis=0,
        )

        return x, snr_levels

    def _build_metadata(
        self,
        batch_size,
        n_sources_batch,
        amplitude_values,
        snr_levels,
        inter_source_correlations,
        noise_color_coeffs,
        selection,
        alphas,
    ):
        """Build simulation metadata DataFrame."""
        info_dict = {
            "n_sources": n_sources_batch,
            "amplitudes": amplitude_values,
            "snr": snr_levels,
            "inter_source_correlations": inter_source_correlations,
            "n_orders": [[self.min_order, self.max_order]] * batch_size,
            "diffusion_parameter": [self.config.diffusion_parameter] * batch_size,
            "n_timepoints": [self.config.n_timepoints] * batch_size,
            "n_timecourses": [self.config.n_timecourses] * batch_size,
            "correlation_mode": [self.config.correlation_mode] * batch_size,
            "noise_color_coeff": noise_color_coeffs,
            "centers": selection,
            "simulation_mode": [self.config.simulation_mode] * batch_size,
        }

        if self.config.simulation_mode == "mixture":
            info_dict.update(
                {
                    "background_beta": [self.config.background_beta] * batch_size,
                    "background_mixture_alpha": alphas,
                }
            )

        return pd.DataFrame(info_dict)

    def generate(self):
        """Generate batches of simulations.

        Yields:
            tuple: (x, y, info) where:
                - x: EEG data [batch_size, n_channels, n_timepoints]
                - y: Source activity [batch_size, n_dipoles, n_timepoints] (scaled)
                - info: DataFrame with simulation metadata
        """
        # Setup correlation modes
        if (
            isinstance(self.config.correlation_mode, str)
            and self.config.correlation_mode.lower() == "auto"
        ):
            correlation_modes = ["cholesky", "banded", "diagonal", None]
            modes_batch = self.rng.choice(correlation_modes, self.config.batch_size)
        else:
            modes_batch = [self.config.correlation_mode] * self.config.batch_size

        while True:
            leadfield = self._setup_leadfield()

            (
                y_patches,
                n_sources_batch,
                selection,
                amplitude_values,
                inter_source_correlations,
                noise_color_coeffs,
            ) = self._generate_patches(self.config.batch_size)

            y, alphas = self._generate_background(self.config.batch_size, y_patches)

            # Vectorized leadfield projection
            x = np.einsum("cd,bdt->bct", leadfield, y)

            x, snr_levels = self._apply_noise(
                x, self.config.batch_size, noise_color_coeffs, modes_batch
            )

            info = self._build_metadata(
                self.config.batch_size,
                n_sources_batch,
                amplitude_values,
                snr_levels,
                inter_source_correlations,
                noise_color_coeffs,
                selection,
                alphas,
            )

            output = (x, y, info)

            for _ in range(self.config.batch_repetitions):
                yield output

__init__

__init__(
    fwd, config: SimulationConfig | None = None, **kwargs
)

Initialize the simulation generator.

Parameters: fwd: MNE forward solution object config: SimulationConfig instance (optional) **kwargs: Configuration parameters (used if config is None)

Source code in invert/simulate/simulate.py
def __init__(self, fwd, config: SimulationConfig | None = None, **kwargs):
    """Initialize the simulation generator.

    Parameters:
        fwd: MNE forward solution object
        config: SimulationConfig instance (optional)
        **kwargs: Configuration parameters (used if config is None)
    """
    # Initialize configuration
    if config is None:
        self.config = SimulationConfig(**kwargs)
    else:
        self.config = config

    # Store forward solution components
    self.fwd = fwd
    self.channel_types = np.array(fwd["info"].get_channel_types())
    self.leadfield_original = deepcopy(fwd["sol"]["data"])
    self.leadfield = deepcopy(self.leadfield_original)
    self.n_chans, self.n_dipoles = self.leadfield.shape

    # Initialize random number generator
    self.rng = np.random.default_rng(self.config.random_seed)

    # Parse n_sources parameter
    if isinstance(self.config.n_sources, (int, float)):
        n_sources_val = np.clip(self.config.n_sources, a_min=1, a_max=np.inf)
        self.min_sources, self.max_sources = int(n_sources_val), int(n_sources_val)
    else:
        self.min_sources, self.max_sources = self.config.n_sources

    # Precompute spatial smoothing operator
    self._precompute_spatial_operators()

    # Precompute timecourses
    self._precompute_timecourses()

    # Setup correlation sampling functions
    self._setup_correlation_samplers()

    # Normalize leadfield if requested
    if self.config.normalize_leadfield:
        self.leadfield /= np.linalg.norm(self.leadfield, axis=0)

generate

generate()

Generate batches of simulations.

Yields: tuple: (x, y, info) where: - x: EEG data [batch_size, n_channels, n_timepoints] - y: Source activity [batch_size, n_dipoles, n_timepoints] (scaled) - info: DataFrame with simulation metadata

Source code in invert/simulate/simulate.py
def generate(self):
    """Generate batches of simulations.

    Yields:
        tuple: (x, y, info) where:
            - x: EEG data [batch_size, n_channels, n_timepoints]
            - y: Source activity [batch_size, n_dipoles, n_timepoints] (scaled)
            - info: DataFrame with simulation metadata
    """
    # Setup correlation modes
    if (
        isinstance(self.config.correlation_mode, str)
        and self.config.correlation_mode.lower() == "auto"
    ):
        correlation_modes = ["cholesky", "banded", "diagonal", None]
        modes_batch = self.rng.choice(correlation_modes, self.config.batch_size)
    else:
        modes_batch = [self.config.correlation_mode] * self.config.batch_size

    while True:
        leadfield = self._setup_leadfield()

        (
            y_patches,
            n_sources_batch,
            selection,
            amplitude_values,
            inter_source_correlations,
            noise_color_coeffs,
        ) = self._generate_patches(self.config.batch_size)

        y, alphas = self._generate_background(self.config.batch_size, y_patches)

        # Vectorized leadfield projection
        x = np.einsum("cd,bdt->bct", leadfield, y)

        x, snr_levels = self._apply_noise(
            x, self.config.batch_size, noise_color_coeffs, modes_batch
        )

        info = self._build_metadata(
            self.config.batch_size,
            n_sources_batch,
            amplitude_values,
            snr_levels,
            inter_source_correlations,
            noise_color_coeffs,
            selection,
            alphas,
        )

        output = (x, y, info)

        for _ in range(self.config.batch_repetitions):
            yield output

compute_covariance

compute_covariance(Y, cov_type='basic')

Compute the covariance matrix of the data.

Parameters:

Name Type Description Default
Y ndarray

The data matrix.

required
cov_type str

The type of covariance matrix to compute. Options are 'basic' and 'SSM'. Default is 'basic'.

'basic'
Return

C : numpy.ndarray The covariance matrix.

Source code in invert/simulate/covariance.py
def compute_covariance(Y, cov_type="basic"):
    """Compute the covariance matrix of the data.

    Parameters
    ----------
    Y : numpy.ndarray
        The data matrix.
    cov_type : str
        The type of covariance matrix to compute. Options are 'basic' and 'SSM'. Default is 'basic'.

    Return
    ------
    C : numpy.ndarray
        The covariance matrix.
    """
    if cov_type == "basic":
        C = Y @ Y.T
    elif cov_type == "SSM":
        n_time = Y.shape[1]
        M_Y = Y.T @ Y
        YY = M_Y + 0.001 * (50 / n_time) * np.trace(M_Y) * np.eye(n_time)
        P_Y = (Y @ np.linalg.inv(YY)) @ Y.T
        C = P_Y.T @ P_Y
    elif cov_type == "riemann":
        msg = "Riemannian covariance is not yet implemented as a standalone function."
        raise NotImplementedError(msg)
    else:
        msg = "Covariance type not recognized. Use 'basic', 'SSM' or provide a custom covariance matrix."
        raise ValueError(msg)

    return C

gen_correlated_sources

gen_correlated_sources(corr_coeff, T, Q)

Generate Q correlated sources with a specified correlation coefficient. The sources are generated as sinusoids with random frequencies and phases.

Parameters:

Name Type Description Default
corr_coeff float

The correlation coefficient between the sources.

required
T int

The number of time points in the sources.

required
Q int

The number of sources to generate.

required

Returns:

Name Type Description
Y ndarray

The generated sources.

Source code in invert/simulate/covariance.py
def gen_correlated_sources(corr_coeff, T, Q):
    """Generate Q correlated sources with a specified correlation coefficient.
    The sources are generated as sinusoids with random frequencies and phases.

    Parameters
    ----------
    corr_coeff : float
        The correlation coefficient between the sources.
    T : int
        The number of time points in the sources.
    Q : int
        The number of sources to generate.

    Returns
    -------
    Y : numpy.ndarray
        The generated sources.
    """
    Cov = np.ones((Q, Q)) * corr_coeff + np.diag(
        np.ones(Q) * (1 - corr_coeff)
    )  # required covariance matrix
    freq = np.random.randint(10, 31, Q)  # random frequencies between 10Hz to 30Hz

    phases = 2 * np.pi * np.random.rand(Q)  # random phases
    t = np.linspace(10 * np.pi / T, 10 * np.pi, T)
    Signals = np.sqrt(2) * np.cos(
        2 * np.pi * freq[:, None] * t + phases[:, None]
    )  # the basic signals

    if corr_coeff < 1:
        A = np.linalg.cholesky(Cov).T  # cholesky Decomposition
        Y = A @ Signals
    else:  # Coherent Sources
        Y = np.tile(Signals[0, :], (Q, 1))

    return Y

get_cov

get_cov(n, corr_coef)

Generate a covariance matrix that is symmetric along the diagonal that correlates sources to a specified degree.

Source code in invert/simulate/covariance.py
def get_cov(n, corr_coef):
    """Generate a covariance matrix that is symmetric along the
    diagonal that correlates sources to a specified degree."""
    if corr_coef < 1:
        cov = np.ones((n, n)) * corr_coef + np.eye(n) * (1 - corr_coef)
        cov = np.linalg.cholesky(cov)
    else:
        # Make all signals be exactly the first one (perfectly coherent)
        cov = np.zeros((n, n))
        cov[:, 0] = 1
    return cov.T

add_white_noise

add_white_noise(
    X_clean,
    snr,
    rng,
    channel_types,
    noise_color_coeff=0.5,
    correlation_mode=None,
)

Parameters:

Name Type Description Default
X_clean ndarray

The clean EEG data.

required
snr float

The signal to noise ratio in dB.

required
correlation_mode None / str

None implies no correlation between the noise in different channels. 'banded' : Colored banded noise, where channels closer to each other will be more correlated. 'diagonal' : Some channels have varying degrees of noise. 'cholesky' : A set correlation coefficient between each pair of channels

None
noise_color_coeff float

The magnitude of spatial coloring of the noise (not the magnitude of noise overall!).

0.5
Source code in invert/simulate/noise.py
def add_white_noise(
    X_clean, snr, rng, channel_types, noise_color_coeff=0.5, correlation_mode=None
):
    """
    Parameters
    ----------
    X_clean : numpy.ndarray
        The clean EEG data.
    snr : float
        The signal to noise ratio in dB.
    correlation_mode : None/str
        None implies no correlation between the noise in different channels.
        'banded' : Colored banded noise, where channels closer to each other will be more correlated.
        'diagonal' : Some channels have varying degrees of noise.
        'cholesky' : A set correlation coefficient between each pair of channels
    noise_color_coeff : float
        The magnitude of spatial coloring of the noise (not the magnitude of noise overall!).
    """
    n_chans, n_time = X_clean.shape
    X_noise = rng.standard_normal((n_chans, n_time))
    snr_linear = 10 ** (snr / 10)

    if isinstance(channel_types, list):
        channel_types = np.array(channel_types)
    # Ensure the channel_types array is correct length
    assert len(channel_types) == n_chans, (
        "Length of channel_types must match the number of channels in X_clean"
    )

    unique_types = np.unique(channel_types)
    X_full = np.zeros_like(X_clean)

    for ch_type in unique_types:
        type_indices = np.where(channel_types == ch_type)[0]
        X_clean_type = X_clean[type_indices, :]
        X_noise_type = X_noise[type_indices, :]
        if isinstance(noise_color_coeff, str) and isinstance(
            correlation_mode, np.ndarray
        ):
            # Real Noise Covariance
            X_noise_type = (
                np.linalg.cholesky(correlation_mode[type_indices][:, type_indices])
                @ X_noise_type
            )
        elif correlation_mode == "cholesky":
            covariance_matrix = np.full(
                (len(type_indices), len(type_indices)), noise_color_coeff
            )
            np.fill_diagonal(covariance_matrix, 1)  # Set diagonal to 1 for variance

            # Cholesky decomposition
            X_noise_type = np.linalg.cholesky(covariance_matrix) @ X_noise_type
        elif correlation_mode == "banded":
            num_sensors = X_noise_type.shape[0]
            Y = np.zeros_like(X_noise_type)
            for i in range(num_sensors):
                Y[i, :] = X_noise_type[i, :]
                for j in range(num_sensors):
                    if abs(i - j) % num_sensors == 1:
                        Y[i, :] += (noise_color_coeff / np.sqrt(2)) * X_noise_type[j, :]
            X_noise_type = Y
        elif correlation_mode == "diagonal":
            X_noise_type[1::3, :] *= 1 - noise_color_coeff
            X_noise_type[2::3, :] *= 1 + noise_color_coeff
        elif correlation_mode is None:
            pass
        else:
            msg = f"correlation_mode can be either None, cholesky, banded or diagonal, but was {correlation_mode}"
            raise AttributeError(msg)

        rms_noise = rms(X_noise_type)
        rms_signal = rms(X_clean_type)
        scaler = rms_signal / (snr_linear * rms_noise)

        X_full[type_indices] = X_clean_type + X_noise_type * scaler

    return X_full

powerlaw_noise

powerlaw_noise(beta, n_timepoints, n_signals=1, rng=None)

Generate 1/f^beta colored noise via FFT spectral shaping.

Parameters:

Name Type Description Default
beta float or array - like

Power-law exponent(s). 0=white, 1=pink, 2=brown. If array, must have length n_signals.

required
n_timepoints int

Number of time samples.

required
n_signals int

Number of independent signals to generate.

1
rng Generator or None

Random number generator.

None

Returns:

Name Type Description
signals (ndarray, shape(n_signals, n_timepoints))

Colored noise signals.

Source code in invert/simulate/noise.py
def powerlaw_noise(beta, n_timepoints, n_signals=1, rng=None):
    """Generate 1/f^beta colored noise via FFT spectral shaping.

    Parameters
    ----------
    beta : float or array-like
        Power-law exponent(s). 0=white, 1=pink, 2=brown.
        If array, must have length n_signals.
    n_timepoints : int
        Number of time samples.
    n_signals : int
        Number of independent signals to generate.
    rng : numpy.random.Generator or None
        Random number generator.

    Returns
    -------
    signals : ndarray, shape (n_signals, n_timepoints)
        Colored noise signals.
    """
    if rng is None:
        rng = np.random.default_rng()

    beta = np.atleast_1d(np.asarray(beta, dtype=float))
    if beta.shape[0] == 1:
        beta = np.broadcast_to(beta, (n_signals,))

    # Generate white noise and FFT
    white = rng.standard_normal((n_signals, n_timepoints))
    spectrum = np.fft.rfft(white)

    # Build frequency-domain filter: 1/f^(beta/2) (power spectrum goes as 1/f^beta)
    freqs = np.fft.rfftfreq(n_timepoints)
    # Skip DC (index 0) to avoid division by zero
    freq_filter = np.ones((n_signals, len(freqs)))
    freq_filter[:, 1:] = freqs[1:][np.newaxis, :] ** (-beta[:, np.newaxis] / 2.0)

    spectrum *= freq_filter
    signals = np.fft.irfft(spectrum, n=n_timepoints)

    return signals

generator

generator(
    fwd,
    use_cov=True,
    cov_type="basic",
    batch_size=1284,
    batch_repetitions=30,
    n_sources=10,
    n_orders=2,
    amplitude_range=(0.5, 1),
    n_timepoints=20,
    snr_range=(-5, 5),
    n_timecourses=5000,
    beta_range=(0, 3),
    return_mask=True,
    scale_data=True,
    return_info=False,
    add_forward_error=False,
    forward_error=0.1,
    remove_channel_dim=False,
    inter_source_correlation=(0.25, 0.75),
    diffusion_smoothing=True,
    diffusion_parameter=0.1,
    correlation_mode=None,
    noise_color_coeff=0.5,
    random_seed=None,
    normalize_leadfield=False,
    verbose=0,
)

.. deprecated:: Use :class:SimulationGenerator instead.

Parameters:

Name Type Description Default
fwd object

Forward solution object containing the source space and orientation information.

required
use_cov bool

If True, a covariance matrix is used in the simulation. Default is True.

True
batch_size int

Size of each batch of simulations. Default is 1284.

1284
batch_repetitions int

Number of repetitions of each batch. Default is 30.

30
n_sources int

Number of sources in the brain from which activity is simulated. Default is 10.

10
n_orders int

The order of the model used to generate time courses. Default is 2.

2
amplitude_range tuple

Range of possible amplitudes for the simulated sources. Default is (0.001,1).

(0.5, 1)
n_timepoints int

Number of timepoints in each simulated time course. Default is 20.

20
snr_range tuple

Range of signal to noise ratios (in dB) to be used in the simulations. Default is (-5, 5 dB).

(-5, 5)
n_timecourses int

Number of unique time courses to simulate. Default is 5000.

5000
beta_range tuple

Range of possible power-law exponents for the power spectral density of the simulated sources. Default is (0, 3).

(0, 3)
return_mask bool

If True, the function will also return a mask of the sources used. Default is True.

True
scale_data bool

If True, the EEG data will be scaled. Default is True.

True
return_info bool

If True, the function will return a dictionary with information about the generated data. Default is False.

False
add_forward_error bool

If True, the function will add an error to the forward model. Default is False.

False
forward_error float

Amount of error to add to the forward model if 'add_forward_error' is True. Default is 0.1.

0.1
remove_channel_dim bool

If True, the channel dimension will be removed from the output data. Default is False.

False
inter_source_correlation float | Tuple

The level of correlation between different sources. Default is 0.5.

(0.25, 0.75)
diffusion_smoothing bool

Whether to use diffusion smoothing. Default is True.

True
diffusion_parameter float

The diffusion parameter (alpha). Default is 0.1.

0.1
correlation_mode None / str

correlation_mode : None/str None implies no correlation between the noise in different channels. 'banded' : Colored banded noise, where channels closer to each other will be more correlated. 'diagonal' : Channels have varying degrees of noise.

None
noise_color_coeff float

The magnitude of spatial coloring of the noise.

0.5
random_seed None / int

The random seed for replicable simulations

None
verbose int

Level of verbosity for the function. Default is 0.

0
Return

x : numpy.ndarray The EEG data matrix. y : numpy.ndarray The source data matrix.

Source code in invert/simulate/simulate.py
def generator(
    fwd,
    use_cov=True,
    cov_type="basic",
    batch_size=1284,
    batch_repetitions=30,
    n_sources=10,
    n_orders=2,
    amplitude_range=(0.5, 1),
    n_timepoints=20,
    snr_range=(-5, 5),
    n_timecourses=5000,
    beta_range=(0, 3),
    return_mask=True,
    scale_data=True,
    return_info=False,
    add_forward_error=False,
    forward_error=0.1,
    remove_channel_dim=False,
    inter_source_correlation=(0.25, 0.75),
    diffusion_smoothing=True,
    diffusion_parameter=0.1,
    correlation_mode=None,
    noise_color_coeff=0.5,
    random_seed=None,
    normalize_leadfield=False,
    verbose=0,
):
    """
    .. deprecated::
        Use :class:`SimulationGenerator` instead.

    Parameters
    ----------
    fwd : object
        Forward solution object containing the source space and orientation information.
    use_cov : bool
        If True, a covariance matrix is used in the simulation. Default is True.
    batch_size : int
        Size of each batch of simulations. Default is 1284.
    batch_repetitions : int
        Number of repetitions of each batch. Default is 30.
    n_sources : int
        Number of sources in the brain from which activity is simulated. Default is 10.
    n_orders : int
        The order of the model used to generate time courses. Default is 2.
    amplitude_range : tuple
        Range of possible amplitudes for the simulated sources. Default is (0.001,1).
    n_timepoints : int
        Number of timepoints in each simulated time course. Default is 20.
    snr_range : tuple
        Range of signal to noise ratios (in dB) to be used in the simulations. Default is (-5, 5 dB).
    n_timecourses : int
        Number of unique time courses to simulate. Default is 5000.
    beta_range : tuple
        Range of possible power-law exponents for the power spectral density of the simulated sources. Default is (0, 3).
    return_mask : bool
        If True, the function will also return a mask of the sources used. Default is True.
    scale_data : bool
        If True, the EEG data will be scaled. Default is True.
    return_info : bool
        If True, the function will return a dictionary with information about the generated data. Default is False.
    add_forward_error : bool
        If True, the function will add an error to the forward model. Default is False.
    forward_error : float
        Amount of error to add to the forward model if 'add_forward_error' is True. Default is 0.1.
    remove_channel_dim : bool
        If True, the channel dimension will be removed from the output data. Default is False.
    inter_source_correlation : float|Tuple
        The level of correlation between different sources. Default is 0.5.
    diffusion_smoothing : bool
        Whether to use diffusion smoothing. Default is True.
    diffusion_parameter : float
        The diffusion parameter (alpha). Default is 0.1.
    correlation_mode : None/str
        correlation_mode : None/str
        None implies no correlation between the noise in different channels.
        'banded' : Colored banded noise, where channels closer to each other will be more correlated.
        'diagonal' : Channels have varying degrees of noise.
    noise_color_coeff : float
        The magnitude of spatial coloring of the noise.
    random_seed : None / int
        The random seed for replicable simulations
    verbose : int
        Level of verbosity for the function. Default is 0.

    Return
    ------
    x : numpy.ndarray
        The EEG data matrix.
    y : numpy.ndarray
        The source data matrix.
    """
    warnings.warn(
        "generator() is deprecated. Use SimulationGenerator instead.",
        DeprecationWarning,
        stacklevel=2,
    )
    rng = np.random.default_rng(random_seed)
    channel_types = fwd["info"].get_channel_types()
    leadfield = deepcopy(fwd["sol"]["data"])
    leadfield_original = deepcopy(fwd["sol"]["data"])
    n_chans, n_dipoles = leadfield.shape

    if isinstance(n_sources, (int, float)):
        n_sources = [np.clip(n_sources, a_min=1, a_max=np.inf), n_sources]
    min_sources, max_sources = n_sources

    adjacency = build_adjacency(fwd, verbose=verbose)

    sources, sources_dense, gradient = build_spatial_basis(
        adjacency,
        n_dipoles,
        0 if not isinstance(n_orders, (tuple, list)) else n_orders[0],
        n_orders if not isinstance(n_orders, (tuple, list)) else n_orders[1],
        diffusion_smoothing=diffusion_smoothing,
        diffusion_parameter=diffusion_parameter,
    )

    # Parse n_orders for min/max
    if isinstance(n_orders, (tuple, list)):
        min_order, max_order = n_orders
        if min_order == max_order:
            max_order += 1
    else:
        min_order = 0
        max_order = n_orders

    del adjacency

    # Normalize columns of the leadfield
    if normalize_leadfield:
        leadfield /= np.linalg.norm(leadfield, axis=0)

    if isinstance(inter_source_correlation, (tuple, list)):

        def get_inter_source_correlation_fn(n=1):
            return rng.uniform(
                inter_source_correlation[0], inter_source_correlation[1], n
            )
    else:

        def get_inter_source_correlation_fn(n=1):
            return rng.uniform(inter_source_correlation, inter_source_correlation, n)

    if isinstance(noise_color_coeff, (tuple, list)):

        def get_noise_color_coeff_fn(n=1):
            return rng.uniform(noise_color_coeff[0], noise_color_coeff[1], n)
    else:

        def get_noise_color_coeff_fn(n=1):
            return rng.uniform(noise_color_coeff, noise_color_coeff, n)

    if isinstance(correlation_mode, str) and correlation_mode.lower() == "auto":
        correlation_modes = ["cholesky", "banded", "diagonal", None]
        correlation_modes = rng.choice(correlation_modes, batch_size)
    else:
        correlation_modes = [
            correlation_mode,
        ] * batch_size

    n_candidates = sources.shape[0]

    # Pre-compute random time courses using vectorized FFT
    betas = rng.uniform(*beta_range, n_timecourses)
    time_courses = powerlaw_noise(betas, n_timepoints, n_signals=n_timecourses, rng=rng)

    # Normalize time course to max(abs()) == 1
    time_courses = (time_courses.T / abs(time_courses).max(axis=1)).T

    while True:
        if add_forward_error:
            leadfield = add_error(leadfield_original, forward_error, gradient, rng)
        # select sources or source patches
        n_sources_batch = rng.integers(min_sources, max_sources + 1, batch_size)
        selection = [rng.integers(0, n_candidates, n) for n in n_sources_batch]

        amplitude_values = [rng.uniform(*amplitude_range, n) for n in n_sources_batch]
        choices = [rng.choice(n_timecourses, n) for n in n_sources_batch]
        amplitudes = [time_courses[choice].T for choice in choices]

        inter_source_correlations = get_inter_source_correlation_fn(n=batch_size)
        if not isinstance(noise_color_coeff, str):
            noise_color_coeffs = get_noise_color_coeff_fn(n=batch_size)
        else:
            noise_color_coeffs = [
                noise_color_coeff,
            ] * batch_size
        source_covariances = [
            get_cov(n, isc)
            for n, isc in zip(n_sources_batch, inter_source_correlations)
        ]
        amplitudes = [
            amp @ np.diag(amplitude_values[i]) @ cov
            for i, (amp, cov) in enumerate(zip(amplitudes, source_covariances))
        ]

        y = np.stack(
            [
                (amplitudes[i] @ sources_dense[selection[i]]) / len(amplitudes[i])
                for i in range(batch_size)
            ],
            axis=0,
        )

        # Project simulated sources through leadfield (vectorized)
        x = np.einsum("cd,bdt->bct", leadfield, np.swapaxes(y, 1, 2))

        # Add white noise to clean EEG
        snr_levels = rng.uniform(low=snr_range[0], high=snr_range[1], size=batch_size)
        x = np.stack(
            [
                add_white_noise(
                    xx,
                    snr_level,
                    rng,
                    channel_types,
                    correlation_mode=corr_mode,
                    noise_color_coeff=noise_color_level,
                )
                for (xx, snr_level, corr_mode, noise_color_level) in zip(
                    x, snr_levels, correlation_modes, noise_color_coeffs
                )
            ],
            axis=0,
        )

        if use_cov:
            # Calculate Covariance

            x = np.stack(
                [compute_covariance(xx, cov_type=cov_type) for xx in x], axis=0
            )

            # Normalize Covariance to abs. max. of 1
            x = np.stack([C / np.max(abs(C)) for C in x], axis=0)

            if not remove_channel_dim:
                x = np.expand_dims(x, axis=-1)
        else:
            if scale_data:
                x = np.stack([xx / np.max(abs(xx)) for xx in x], axis=0)
            # Reshape
            x = np.swapaxes(x, 1, 2)

        if return_mask:
            # (1) binary
            # Calculate mean source activity
            y = abs(y).mean(axis=1)
            # Masking the source vector (1-> active, 0-> inactive)
            y = (y > 0).astype(float)
        else:
            if scale_data:
                y = np.stack([(yy.T / np.max(abs(yy), axis=1)).T for yy in y], axis=0)

        if return_info:
            info = pd.DataFrame(
                dict(
                    n_sources=n_sources_batch,
                    amplitudes=amplitude_values,
                    snr=snr_levels,
                    inter_source_correlations=inter_source_correlations,
                    n_orders=[
                        [min_order, max_order],
                    ]
                    * batch_size,
                    diffusion_parameter=[
                        diffusion_parameter,
                    ]
                    * batch_size,
                    n_timepoints=[
                        n_timepoints,
                    ]
                    * batch_size,
                    n_timecourses=[
                        n_timecourses,
                    ]
                    * batch_size,
                    correlation_mode=[
                        correlation_mode,
                    ]
                    * batch_size,
                    noise_color_coeff=noise_color_coeffs,
                    centers=selection,
                )
            )
            output = (x, y, info)
        else:
            output = (x, y)

        for _ in range(batch_repetitions):
            yield output

generator_simple

generator_simple(
    fwd,
    batch_size,
    corrs,
    T,
    n_sources,
    SNR_range,
    random_seed=42,
    return_info=True,
)

.. deprecated:: Use :class:SimulationGenerator instead.

Source code in invert/simulate/simulate.py
def generator_simple(
    fwd, batch_size, corrs, T, n_sources, SNR_range, random_seed=42, return_info=True
):
    """
    .. deprecated::
        Use :class:`SimulationGenerator` instead.
    """
    warnings.warn(
        "generator_simple() is deprecated. Use SimulationGenerator instead.",
        DeprecationWarning,
        stacklevel=2,
    )
    rng = np.random.default_rng(random_seed)
    leadfield = deepcopy(fwd["sol"]["data"])
    leadfield /= leadfield.std(axis=0)
    n_channels, n_dipoles = leadfield.shape

    while True:
        sim_info = list()
        X = np.zeros((batch_size, n_channels, T))
        y = np.zeros((batch_size, n_dipoles, T))
        corrs_batch = rng.uniform(corrs[0], corrs[1], batch_size)
        SNR_batch = rng.uniform(SNR_range[0], SNR_range[1], batch_size)
        indices = [
            rng.choice(fwd["sol"]["data"].shape[1], n_sources)
            for _ in range(batch_size)
        ]

        for i in range(batch_size):
            X[i], y[i] = generator_single_simple(
                leadfield,
                corrs_batch[i],
                T,
                n_sources,
                indices[i],
                SNR_batch[i],
                random_seed=random_seed,
            )
            d = dict(
                n_sources=n_sources,
                amplitudes=1,
                snr=SNR_batch[i],
                inter_source_correlations=corrs_batch[i],
                n_orders=[0, 0],
                diffusion_parameter=0,
                n_timepoints=T,
                n_timecourses=np.inf,
                iid_noise=True,
            )
            sim_info.append(d)
        if return_info:
            sim_info = pd.DataFrame(sim_info)
            yield X, y, sim_info
        else:
            yield X, y

generator_single_simple

generator_single_simple(
    leadfield,
    corr,
    T,
    n_sources,
    indices,
    SNR,
    random_seed=42,
)

.. deprecated:: Use :class:SimulationGenerator instead.

Parameters:

Name Type Description Default
leadfield ndarray

The leadfield matrix.

required
corr float

The correlation coefficient between the sources.

required
T int

The number of time points in the sources.

required
n_sources int

The number of sources to generate.

required
indices list

The indices of the sources to generate.

required
SNR float

The signal to noise ratio.

required
random_seed int

The random seed for replicable simulations.

42
Return

X : numpy.ndarray The simulated EEG data. y: numpy.ndarray The simulated source data.

Source code in invert/simulate/simulate.py
def generator_single_simple(
    leadfield, corr, T, n_sources, indices, SNR, random_seed=42
):
    """
    .. deprecated::
        Use :class:`SimulationGenerator` instead.

    Parameters
    ----------
    leadfield : numpy.ndarray
        The leadfield matrix.
    corr : float
        The correlation coefficient between the sources.
    T : int
        The number of time points in the sources.
    n_sources : int
        The number of sources to generate.
    indices : list
        The indices of the sources to generate.
    SNR : float
        The signal to noise ratio.
    random_seed : int
        The random seed for replicable simulations.

    Return
    ------
    X : numpy.ndarray
        The simulated EEG data.
    y: numpy.ndarray
        The simulated source data.
    """
    warnings.warn(
        "generator_single_simple() is deprecated. Use SimulationGenerator instead.",
        DeprecationWarning,
        stacklevel=2,
    )
    rng = np.random.default_rng(random_seed)

    S = gen_correlated_sources(corr, T, n_sources)
    M = leadfield[:, indices] @ S  # use Ground Truth Gain matrix
    n_channels, n_dipoles = leadfield.shape

    scale = np.max(abs(M))
    Ms = M * scale
    MEG_energy = np.trace(Ms @ Ms.T) / (n_channels * T)
    noise_var = MEG_energy / (10 ** (SNR / 10))
    Noise = rng.standard_normal((n_channels, T)) * np.sqrt(noise_var)
    X = Ms + Noise
    y = np.zeros((n_dipoles, T))
    y[indices, :] = S

    return X, y

build_adjacency

build_adjacency(forward, verbose=0)

Build sparse adjacency matrix from an MNE forward solution.

Parameters:

Name Type Description Default
forward dict

MNE forward solution object.

required
verbose int

Verbosity level.

0

Returns:

Name Type Description
adjacency csr_matrix

Sparse adjacency matrix.

Source code in invert/simulate/spatial.py
def build_adjacency(forward, verbose=0):
    """Build sparse adjacency matrix from an MNE forward solution.

    Parameters
    ----------
    forward : dict
        MNE forward solution object.
    verbose : int
        Verbosity level.

    Returns
    -------
    adjacency : csr_matrix
        Sparse adjacency matrix.
    """
    adjacency = mne.spatial_src_adjacency(forward["src"], verbose=verbose)
    return csr_matrix(adjacency)

build_spatial_basis

build_spatial_basis(
    adjacency,
    n_dipoles,
    min_order,
    max_order,
    diffusion_smoothing=True,
    diffusion_parameter=0.1,
)

Build multi-order spatial basis from adjacency matrix.

Parameters:

Name Type Description Default
adjacency csr_matrix

Sparse adjacency matrix.

required
n_dipoles int

Number of dipoles (source space size).

required
min_order int

Minimum smoothing order to include.

required
max_order int

Maximum smoothing order (exclusive).

required
diffusion_smoothing bool

Whether to use diffusion smoothing (True) or absolute Laplacian (False).

True
diffusion_parameter float

Diffusion parameter alpha for smoothing.

0.1

Returns:

Name Type Description
sources csr_matrix

Stacked spatial basis (sparse).

sources_dense ndarray

Dense version for fast indexing.

gradient csr_matrix

The gradient/smoothing operator.

Source code in invert/simulate/spatial.py
def build_spatial_basis(
    adjacency,
    n_dipoles,
    min_order,
    max_order,
    diffusion_smoothing=True,
    diffusion_parameter=0.1,
):
    """Build multi-order spatial basis from adjacency matrix.

    Parameters
    ----------
    adjacency : csr_matrix
        Sparse adjacency matrix.
    n_dipoles : int
        Number of dipoles (source space size).
    min_order : int
        Minimum smoothing order to include.
    max_order : int
        Maximum smoothing order (exclusive).
    diffusion_smoothing : bool
        Whether to use diffusion smoothing (True) or absolute Laplacian (False).
    diffusion_parameter : float
        Diffusion parameter alpha for smoothing.

    Returns
    -------
    sources : csr_matrix
        Stacked spatial basis (sparse).
    sources_dense : ndarray
        Dense version for fast indexing.
    gradient : csr_matrix
        The gradient/smoothing operator.
    """
    if diffusion_smoothing:
        gradient = np.identity(n_dipoles) - diffusion_parameter * laplacian(adjacency)
    else:
        gradient = abs(laplacian(adjacency))

    gradient = csr_matrix(gradient)

    # Build multi-order source basis using iterative sparse multiplication
    last_block = csr_matrix(np.identity(n_dipoles))
    blocks = [last_block]

    for _i in range(1, max_order):
        last_block = last_block @ gradient

        # Normalize each column (with guard against zero max)
        row_maxes = last_block.max(axis=0).toarray().flatten()
        row_maxes[row_maxes == 0] = 1.0
        last_block = last_block / row_maxes[np.newaxis]

        blocks.append(last_block)

    sources = vstack(blocks[min_order:])
    sources_dense = sources.toarray()

    return sources, sources_dense, gradient

SimulationConfig

invert.simulate.SimulationConfig

Bases: BaseModel

Configuration for EEG simulation generator.

Attributes: batch_size: Number of simulations per batch batch_repetitions: Number of times to repeat each batch n_sources: Number of active sources (int or tuple for range) n_orders: Smoothness order(s) for spatial patterns amplitude_range: Min/max amplitude for source activity n_timepoints: Number of time samples per simulation snr_range: Signal-to-noise ratio range in dB n_timecourses: Number of pre-generated timecourses beta_range: Power-law exponent range for 1/f noise add_forward_error: Whether to add perturbations to leadfield forward_error: Magnitude of forward model error inter_source_correlation: Correlation between sources (float or range) diffusion_smoothing: Whether to use diffusion-based smoothing diffusion_parameter: Smoothing strength (alpha parameter) correlation_mode: Spatial noise correlation pattern noise_color_coeff: Spatial noise correlation strength random_seed: Random seed for reproducibility normalize_leadfield: Whether to normalize leadfield columns verbose: Verbosity level simulation_mode: Simulation mode ('patches' or 'mixture') background_beta: 1/f^beta exponent for smooth background background_mixture_alpha: Mixing coefficient alpha (higher = more background)

Source code in invert/simulate/config.py
class SimulationConfig(BaseModel):
    """Configuration for EEG simulation generator.

    Attributes:
        batch_size: Number of simulations per batch
        batch_repetitions: Number of times to repeat each batch
        n_sources: Number of active sources (int or tuple for range)
        n_orders: Smoothness order(s) for spatial patterns
        amplitude_range: Min/max amplitude for source activity
        n_timepoints: Number of time samples per simulation
        snr_range: Signal-to-noise ratio range in dB
        n_timecourses: Number of pre-generated timecourses
        beta_range: Power-law exponent range for 1/f noise
        add_forward_error: Whether to add perturbations to leadfield
        forward_error: Magnitude of forward model error
        inter_source_correlation: Correlation between sources (float or range)
        diffusion_smoothing: Whether to use diffusion-based smoothing
        diffusion_parameter: Smoothing strength (alpha parameter)
        correlation_mode: Spatial noise correlation pattern
        noise_color_coeff: Spatial noise correlation strength
        random_seed: Random seed for reproducibility
        normalize_leadfield: Whether to normalize leadfield columns
        verbose: Verbosity level
        simulation_mode: Simulation mode ('patches' or 'mixture')
        background_beta: 1/f^beta exponent for smooth background
        background_mixture_alpha: Mixing coefficient alpha (higher = more background)
    """

    batch_size: int = Field(default=1284, ge=1)
    batch_repetitions: int = Field(default=1, ge=1)
    n_sources: Union[int, tuple[int, int]] = Field(
        default=(1, 5), description="Single value or (min, max) tuple"
    )
    n_orders: Union[int, tuple[int, int]] = Field(
        default=(0, 3), description="Smoothness order or (min, max) tuple"
    )
    amplitude_range: tuple[float, float] = Field(
        default=(0.5, 1.0), description="Source amplitude range"
    )
    n_timepoints: int = Field(default=20, ge=1)
    snr_range: tuple[float, float] = Field(
        default=(-5.0, 5.0), description="SNR range in dB"
    )
    n_timecourses: int = Field(default=5000, ge=1)
    beta_range: tuple[float, float] = Field(default=(0.0, 3.0))
    add_forward_error: bool = Field(default=False)
    forward_error: float = Field(default=0.1, ge=0.0)
    inter_source_correlation: Union[float, tuple[float, float]] = Field(
        default=(0.25, 0.75)
    )
    diffusion_smoothing: bool = Field(default=True)
    diffusion_parameter: float = Field(default=0.1, ge=0.0)
    correlation_mode: Optional[
        Union[Literal["auto", "cholesky", "banded", "diagonal"], None]
    ] = Field(default=None)
    noise_color_coeff: Union[float, tuple[float, float]] = Field(default=(0.25, 0.75))
    random_seed: Optional[int] = Field(default=None)
    normalize_leadfield: bool = Field(default=False)
    verbose: int = Field(default=0, ge=0)

    # Mixture mode parameters (simple background + patches)
    simulation_mode: Literal["patches", "mixture"] = Field(
        default="patches",
        description="Simulation mode: 'patches' (sparse only) or 'mixture' (smooth background + patches)",
    )
    background_beta: Union[float, tuple[float, float]] = Field(
        default=(0.5, 2.0),
        description="1/f^beta exponent for smooth background temporal dynamics",
    )
    background_mixture_alpha: Union[float, tuple[float, float]] = Field(
        default=(0.7, 0.9),
        description="Mixing coefficient alpha: y = alpha*y_background + (1-alpha)*y_patches",
    )

    model_config = ConfigDict(frozen=False)

SimulationGenerator

invert.simulate.SimulationGenerator

Class-based EEG simulation generator with precomputed components.

This generator creates realistic EEG simulations by: 1. Generating spatially smooth source patterns 2. Assigning colored noise timecourses to sources 3. Projecting through the leadfield matrix 4. Adding spatially/temporally colored sensor noise

The class precomputes spatial smoothing operators and timecourses during initialization for faster batch generation.

Source code in invert/simulate/simulate.py
class SimulationGenerator:
    """Class-based EEG simulation generator with precomputed components.

    This generator creates realistic EEG simulations by:
    1. Generating spatially smooth source patterns
    2. Assigning colored noise timecourses to sources
    3. Projecting through the leadfield matrix
    4. Adding spatially/temporally colored sensor noise

    The class precomputes spatial smoothing operators and timecourses
    during initialization for faster batch generation.
    """

    def __init__(self, fwd, config: SimulationConfig | None = None, **kwargs):
        """Initialize the simulation generator.

        Parameters:
            fwd: MNE forward solution object
            config: SimulationConfig instance (optional)
            **kwargs: Configuration parameters (used if config is None)
        """
        # Initialize configuration
        if config is None:
            self.config = SimulationConfig(**kwargs)
        else:
            self.config = config

        # Store forward solution components
        self.fwd = fwd
        self.channel_types = np.array(fwd["info"].get_channel_types())
        self.leadfield_original = deepcopy(fwd["sol"]["data"])
        self.leadfield = deepcopy(self.leadfield_original)
        self.n_chans, self.n_dipoles = self.leadfield.shape

        # Initialize random number generator
        self.rng = np.random.default_rng(self.config.random_seed)

        # Parse n_sources parameter
        if isinstance(self.config.n_sources, (int, float)):
            n_sources_val = np.clip(self.config.n_sources, a_min=1, a_max=np.inf)
            self.min_sources, self.max_sources = int(n_sources_val), int(n_sources_val)
        else:
            self.min_sources, self.max_sources = self.config.n_sources

        # Precompute spatial smoothing operator
        self._precompute_spatial_operators()

        # Precompute timecourses
        self._precompute_timecourses()

        # Setup correlation sampling functions
        self._setup_correlation_samplers()

        # Normalize leadfield if requested
        if self.config.normalize_leadfield:
            self.leadfield /= np.linalg.norm(self.leadfield, axis=0)

    def _precompute_spatial_operators(self):
        """Precompute graph Laplacian and smoothing operators."""
        self.adjacency = build_adjacency(self.fwd, verbose=self.config.verbose)

        # Parse n_orders parameter
        if isinstance(self.config.n_orders, (tuple, list)):
            self.min_order, self.max_order = self.config.n_orders
            if self.min_order == self.max_order:
                self.max_order += 1
        else:
            self.min_order = 0
            self.max_order = self.config.n_orders

        self.sources, self.sources_dense, self.gradient = build_spatial_basis(
            self.adjacency,
            self.n_dipoles,
            self.min_order,
            self.max_order,
            diffusion_smoothing=self.config.diffusion_smoothing,
            diffusion_parameter=self.config.diffusion_parameter,
        )
        self.n_candidates = self.sources.shape[0]

    def _precompute_timecourses(self):
        """Precompute colored noise timecourses."""
        betas = self.rng.uniform(*self.config.beta_range, self.config.n_timecourses)

        time_courses = powerlaw_noise(
            betas,
            self.config.n_timepoints,
            n_signals=self.config.n_timecourses,
            rng=self.rng,
        )

        # Normalize to max(abs()) == 1
        self.time_courses = (time_courses.T / abs(time_courses).max(axis=1)).T

    def _setup_correlation_samplers(self):
        """Setup sampling functions for correlation parameters."""
        isc = self.config.inter_source_correlation
        if isinstance(isc, (tuple, list)):
            self.get_inter_source_correlation = lambda n=1: self.rng.uniform(
                isc[0], isc[1], n
            )
        else:
            self.get_inter_source_correlation = lambda n=1: np.full(n, isc)

        ncc = self.config.noise_color_coeff
        if isinstance(ncc, (tuple, list)):
            self.get_noise_color_coeff = lambda n=1: self.rng.uniform(ncc[0], ncc[1], n)
        else:
            self.get_noise_color_coeff = lambda n=1: np.full(n, ncc)

    def _generate_smooth_background(self, batch_size):
        """Generate smooth background activity with 1/f^beta temporal dynamics.

        Uses vectorized FFT-based colored noise generation instead of
        per-dipole Python loops.

        Parameters:
            batch_size: Number of simulations to generate

        Returns:
            y_background: [batch_size, n_dipoles, n_timepoints] background activity
        """
        # Sample beta parameters for background
        if isinstance(self.config.background_beta, tuple):
            betas = self.rng.uniform(*self.config.background_beta, batch_size)
        else:
            betas = np.full(batch_size, self.config.background_beta)

        y_background_all = np.empty(
            (batch_size, self.n_dipoles, self.config.n_timepoints)
        )

        for b_idx, beta in enumerate(betas):
            # Vectorized: generate all dipole timecourses at once
            background_timecourses = powerlaw_noise(
                beta, self.config.n_timepoints, n_signals=self.n_dipoles, rng=self.rng
            )  # [n_dipoles, n_timepoints]

            # Apply spatial smoothing using gradient operator
            background_smooth = self.gradient @ background_timecourses

            # Normalize
            max_val = np.max(np.abs(background_smooth))
            if max_val > 0:
                background_smooth = background_smooth / max_val
            y_background_all[b_idx] = background_smooth

        return y_background_all

    def _setup_leadfield(self):
        """Get the leadfield matrix, optionally with forward model error."""
        if self.config.add_forward_error:
            return add_error(
                self.leadfield_original,
                self.config.forward_error,
                self.gradient,
                self.rng,
            )
        return self.leadfield

    def _generate_patches(self, batch_size):
        """Generate patch-based source activity.

        Returns:
            y_patches: [batch_size, n_dipoles, n_timepoints] patch activity
            selection: list of source index arrays
            amplitude_values: list of amplitude arrays
            inter_source_correlations: array of correlation values
            noise_color_coeffs: array of noise color coefficients
        """
        n_sources_batch = self.rng.integers(
            self.min_sources, self.max_sources + 1, batch_size
        )

        # Select source locations
        selection = [
            self.rng.integers(0, self.n_candidates, n) for n in n_sources_batch
        ]

        # Sample amplitudes and timecourses
        amplitude_values = [
            self.rng.uniform(*self.config.amplitude_range, n) for n in n_sources_batch
        ]
        timecourse_choices = [
            self.rng.choice(self.config.n_timecourses, n) for n in n_sources_batch
        ]
        amplitudes = [self.time_courses[choice].T for choice in timecourse_choices]

        # Apply inter-source correlations
        inter_source_correlations = self.get_inter_source_correlation(n=batch_size)
        noise_color_coeffs = self.get_noise_color_coeff(n=batch_size)

        source_covariances = [
            get_cov(n, isc)
            for n, isc in zip(n_sources_batch, inter_source_correlations)
        ]
        amplitudes = [
            amp @ np.diag(amplitude_values[i]) @ cov
            for i, (amp, cov) in enumerate(zip(amplitudes, source_covariances))
        ]

        # Generate patch activity using dense source matrix for fast indexing
        y_patches = np.stack(
            [
                (amplitudes[i] @ self.sources_dense[selection[i]]).T
                / len(amplitudes[i])
                for i in range(batch_size)
            ],
            axis=0,
        )

        return (
            y_patches,
            n_sources_batch,
            selection,
            amplitude_values,
            inter_source_correlations,
            noise_color_coeffs,
        )

    def _generate_background(self, batch_size, y_patches):
        """Mix background activity with patches if in mixture mode.

        Returns:
            y: [batch_size, n_dipoles, n_timepoints] combined activity
            alphas: mixing coefficients or None
        """
        if self.config.simulation_mode == "mixture":
            y_background = self._generate_smooth_background(batch_size)

            if isinstance(self.config.background_mixture_alpha, tuple):
                alphas = self.rng.uniform(
                    *self.config.background_mixture_alpha, batch_size
                )
            else:
                alphas = np.full(batch_size, self.config.background_mixture_alpha)

            alphas_bc = alphas[:, np.newaxis, np.newaxis]
            y = alphas_bc * y_background + (1 - alphas_bc) * y_patches
        else:
            y = y_patches
            alphas = None

        return y, alphas

    def _apply_noise(self, x, batch_size, noise_color_coeffs, modes_batch):
        """Apply sensor noise to EEG data.

        Returns:
            x: [batch_size, n_channels, n_timepoints] noisy EEG data
            snr_levels: array of SNR values used
        """
        snr_levels = self.rng.uniform(
            low=self.config.snr_range[0],
            high=self.config.snr_range[1],
            size=batch_size,
        )

        x = np.stack(
            [
                add_white_noise(
                    xx,
                    snr_level,
                    self.rng,
                    self.channel_types,
                    correlation_mode=corr_mode,
                    noise_color_coeff=noise_color_level,
                )
                for (xx, snr_level, corr_mode, noise_color_level) in zip(
                    x, snr_levels, modes_batch, noise_color_coeffs
                )
            ],
            axis=0,
        )

        return x, snr_levels

    def _build_metadata(
        self,
        batch_size,
        n_sources_batch,
        amplitude_values,
        snr_levels,
        inter_source_correlations,
        noise_color_coeffs,
        selection,
        alphas,
    ):
        """Build simulation metadata DataFrame."""
        info_dict = {
            "n_sources": n_sources_batch,
            "amplitudes": amplitude_values,
            "snr": snr_levels,
            "inter_source_correlations": inter_source_correlations,
            "n_orders": [[self.min_order, self.max_order]] * batch_size,
            "diffusion_parameter": [self.config.diffusion_parameter] * batch_size,
            "n_timepoints": [self.config.n_timepoints] * batch_size,
            "n_timecourses": [self.config.n_timecourses] * batch_size,
            "correlation_mode": [self.config.correlation_mode] * batch_size,
            "noise_color_coeff": noise_color_coeffs,
            "centers": selection,
            "simulation_mode": [self.config.simulation_mode] * batch_size,
        }

        if self.config.simulation_mode == "mixture":
            info_dict.update(
                {
                    "background_beta": [self.config.background_beta] * batch_size,
                    "background_mixture_alpha": alphas,
                }
            )

        return pd.DataFrame(info_dict)

    def generate(self):
        """Generate batches of simulations.

        Yields:
            tuple: (x, y, info) where:
                - x: EEG data [batch_size, n_channels, n_timepoints]
                - y: Source activity [batch_size, n_dipoles, n_timepoints] (scaled)
                - info: DataFrame with simulation metadata
        """
        # Setup correlation modes
        if (
            isinstance(self.config.correlation_mode, str)
            and self.config.correlation_mode.lower() == "auto"
        ):
            correlation_modes = ["cholesky", "banded", "diagonal", None]
            modes_batch = self.rng.choice(correlation_modes, self.config.batch_size)
        else:
            modes_batch = [self.config.correlation_mode] * self.config.batch_size

        while True:
            leadfield = self._setup_leadfield()

            (
                y_patches,
                n_sources_batch,
                selection,
                amplitude_values,
                inter_source_correlations,
                noise_color_coeffs,
            ) = self._generate_patches(self.config.batch_size)

            y, alphas = self._generate_background(self.config.batch_size, y_patches)

            # Vectorized leadfield projection
            x = np.einsum("cd,bdt->bct", leadfield, y)

            x, snr_levels = self._apply_noise(
                x, self.config.batch_size, noise_color_coeffs, modes_batch
            )

            info = self._build_metadata(
                self.config.batch_size,
                n_sources_batch,
                amplitude_values,
                snr_levels,
                inter_source_correlations,
                noise_color_coeffs,
                selection,
                alphas,
            )

            output = (x, y, info)

            for _ in range(self.config.batch_repetitions):
                yield output

__init__

__init__(
    fwd, config: SimulationConfig | None = None, **kwargs
)

Initialize the simulation generator.

Parameters: fwd: MNE forward solution object config: SimulationConfig instance (optional) **kwargs: Configuration parameters (used if config is None)

Source code in invert/simulate/simulate.py
def __init__(self, fwd, config: SimulationConfig | None = None, **kwargs):
    """Initialize the simulation generator.

    Parameters:
        fwd: MNE forward solution object
        config: SimulationConfig instance (optional)
        **kwargs: Configuration parameters (used if config is None)
    """
    # Initialize configuration
    if config is None:
        self.config = SimulationConfig(**kwargs)
    else:
        self.config = config

    # Store forward solution components
    self.fwd = fwd
    self.channel_types = np.array(fwd["info"].get_channel_types())
    self.leadfield_original = deepcopy(fwd["sol"]["data"])
    self.leadfield = deepcopy(self.leadfield_original)
    self.n_chans, self.n_dipoles = self.leadfield.shape

    # Initialize random number generator
    self.rng = np.random.default_rng(self.config.random_seed)

    # Parse n_sources parameter
    if isinstance(self.config.n_sources, (int, float)):
        n_sources_val = np.clip(self.config.n_sources, a_min=1, a_max=np.inf)
        self.min_sources, self.max_sources = int(n_sources_val), int(n_sources_val)
    else:
        self.min_sources, self.max_sources = self.config.n_sources

    # Precompute spatial smoothing operator
    self._precompute_spatial_operators()

    # Precompute timecourses
    self._precompute_timecourses()

    # Setup correlation sampling functions
    self._setup_correlation_samplers()

    # Normalize leadfield if requested
    if self.config.normalize_leadfield:
        self.leadfield /= np.linalg.norm(self.leadfield, axis=0)

generate

generate()

Generate batches of simulations.

Yields: tuple: (x, y, info) where: - x: EEG data [batch_size, n_channels, n_timepoints] - y: Source activity [batch_size, n_dipoles, n_timepoints] (scaled) - info: DataFrame with simulation metadata

Source code in invert/simulate/simulate.py
def generate(self):
    """Generate batches of simulations.

    Yields:
        tuple: (x, y, info) where:
            - x: EEG data [batch_size, n_channels, n_timepoints]
            - y: Source activity [batch_size, n_dipoles, n_timepoints] (scaled)
            - info: DataFrame with simulation metadata
    """
    # Setup correlation modes
    if (
        isinstance(self.config.correlation_mode, str)
        and self.config.correlation_mode.lower() == "auto"
    ):
        correlation_modes = ["cholesky", "banded", "diagonal", None]
        modes_batch = self.rng.choice(correlation_modes, self.config.batch_size)
    else:
        modes_batch = [self.config.correlation_mode] * self.config.batch_size

    while True:
        leadfield = self._setup_leadfield()

        (
            y_patches,
            n_sources_batch,
            selection,
            amplitude_values,
            inter_source_correlations,
            noise_color_coeffs,
        ) = self._generate_patches(self.config.batch_size)

        y, alphas = self._generate_background(self.config.batch_size, y_patches)

        # Vectorized leadfield projection
        x = np.einsum("cd,bdt->bct", leadfield, y)

        x, snr_levels = self._apply_noise(
            x, self.config.batch_size, noise_color_coeffs, modes_batch
        )

        info = self._build_metadata(
            self.config.batch_size,
            n_sources_batch,
            amplitude_values,
            snr_levels,
            inter_source_correlations,
            noise_color_coeffs,
            selection,
            alphas,
        )

        output = (x, y, info)

        for _ in range(self.config.batch_repetitions):
            yield output

Noise Functions

invert.simulate.add_white_noise

add_white_noise(
    X_clean,
    snr,
    rng,
    channel_types,
    noise_color_coeff=0.5,
    correlation_mode=None,
)

Parameters:

Name Type Description Default
X_clean ndarray

The clean EEG data.

required
snr float

The signal to noise ratio in dB.

required
correlation_mode None / str

None implies no correlation between the noise in different channels. 'banded' : Colored banded noise, where channels closer to each other will be more correlated. 'diagonal' : Some channels have varying degrees of noise. 'cholesky' : A set correlation coefficient between each pair of channels

None
noise_color_coeff float

The magnitude of spatial coloring of the noise (not the magnitude of noise overall!).

0.5
Source code in invert/simulate/noise.py
def add_white_noise(
    X_clean, snr, rng, channel_types, noise_color_coeff=0.5, correlation_mode=None
):
    """
    Parameters
    ----------
    X_clean : numpy.ndarray
        The clean EEG data.
    snr : float
        The signal to noise ratio in dB.
    correlation_mode : None/str
        None implies no correlation between the noise in different channels.
        'banded' : Colored banded noise, where channels closer to each other will be more correlated.
        'diagonal' : Some channels have varying degrees of noise.
        'cholesky' : A set correlation coefficient between each pair of channels
    noise_color_coeff : float
        The magnitude of spatial coloring of the noise (not the magnitude of noise overall!).
    """
    n_chans, n_time = X_clean.shape
    X_noise = rng.standard_normal((n_chans, n_time))
    snr_linear = 10 ** (snr / 10)

    if isinstance(channel_types, list):
        channel_types = np.array(channel_types)
    # Ensure the channel_types array is correct length
    assert len(channel_types) == n_chans, (
        "Length of channel_types must match the number of channels in X_clean"
    )

    unique_types = np.unique(channel_types)
    X_full = np.zeros_like(X_clean)

    for ch_type in unique_types:
        type_indices = np.where(channel_types == ch_type)[0]
        X_clean_type = X_clean[type_indices, :]
        X_noise_type = X_noise[type_indices, :]
        if isinstance(noise_color_coeff, str) and isinstance(
            correlation_mode, np.ndarray
        ):
            # Real Noise Covariance
            X_noise_type = (
                np.linalg.cholesky(correlation_mode[type_indices][:, type_indices])
                @ X_noise_type
            )
        elif correlation_mode == "cholesky":
            covariance_matrix = np.full(
                (len(type_indices), len(type_indices)), noise_color_coeff
            )
            np.fill_diagonal(covariance_matrix, 1)  # Set diagonal to 1 for variance

            # Cholesky decomposition
            X_noise_type = np.linalg.cholesky(covariance_matrix) @ X_noise_type
        elif correlation_mode == "banded":
            num_sensors = X_noise_type.shape[0]
            Y = np.zeros_like(X_noise_type)
            for i in range(num_sensors):
                Y[i, :] = X_noise_type[i, :]
                for j in range(num_sensors):
                    if abs(i - j) % num_sensors == 1:
                        Y[i, :] += (noise_color_coeff / np.sqrt(2)) * X_noise_type[j, :]
            X_noise_type = Y
        elif correlation_mode == "diagonal":
            X_noise_type[1::3, :] *= 1 - noise_color_coeff
            X_noise_type[2::3, :] *= 1 + noise_color_coeff
        elif correlation_mode is None:
            pass
        else:
            msg = f"correlation_mode can be either None, cholesky, banded or diagonal, but was {correlation_mode}"
            raise AttributeError(msg)

        rms_noise = rms(X_noise_type)
        rms_signal = rms(X_clean_type)
        scaler = rms_signal / (snr_linear * rms_noise)

        X_full[type_indices] = X_clean_type + X_noise_type * scaler

    return X_full

invert.simulate.powerlaw_noise

powerlaw_noise(beta, n_timepoints, n_signals=1, rng=None)

Generate 1/f^beta colored noise via FFT spectral shaping.

Parameters:

Name Type Description Default
beta float or array - like

Power-law exponent(s). 0=white, 1=pink, 2=brown. If array, must have length n_signals.

required
n_timepoints int

Number of time samples.

required
n_signals int

Number of independent signals to generate.

1
rng Generator or None

Random number generator.

None

Returns:

Name Type Description
signals (ndarray, shape(n_signals, n_timepoints))

Colored noise signals.

Source code in invert/simulate/noise.py
def powerlaw_noise(beta, n_timepoints, n_signals=1, rng=None):
    """Generate 1/f^beta colored noise via FFT spectral shaping.

    Parameters
    ----------
    beta : float or array-like
        Power-law exponent(s). 0=white, 1=pink, 2=brown.
        If array, must have length n_signals.
    n_timepoints : int
        Number of time samples.
    n_signals : int
        Number of independent signals to generate.
    rng : numpy.random.Generator or None
        Random number generator.

    Returns
    -------
    signals : ndarray, shape (n_signals, n_timepoints)
        Colored noise signals.
    """
    if rng is None:
        rng = np.random.default_rng()

    beta = np.atleast_1d(np.asarray(beta, dtype=float))
    if beta.shape[0] == 1:
        beta = np.broadcast_to(beta, (n_signals,))

    # Generate white noise and FFT
    white = rng.standard_normal((n_signals, n_timepoints))
    spectrum = np.fft.rfft(white)

    # Build frequency-domain filter: 1/f^(beta/2) (power spectrum goes as 1/f^beta)
    freqs = np.fft.rfftfreq(n_timepoints)
    # Skip DC (index 0) to avoid division by zero
    freq_filter = np.ones((n_signals, len(freqs)))
    freq_filter[:, 1:] = freqs[1:][np.newaxis, :] ** (-beta[:, np.newaxis] / 2.0)

    spectrum *= freq_filter
    signals = np.fft.irfft(spectrum, n=n_timepoints)

    return signals

Covariance Utilities

invert.simulate.compute_covariance

compute_covariance(Y, cov_type='basic')

Compute the covariance matrix of the data.

Parameters:

Name Type Description Default
Y ndarray

The data matrix.

required
cov_type str

The type of covariance matrix to compute. Options are 'basic' and 'SSM'. Default is 'basic'.

'basic'
Return

C : numpy.ndarray The covariance matrix.

Source code in invert/simulate/covariance.py
def compute_covariance(Y, cov_type="basic"):
    """Compute the covariance matrix of the data.

    Parameters
    ----------
    Y : numpy.ndarray
        The data matrix.
    cov_type : str
        The type of covariance matrix to compute. Options are 'basic' and 'SSM'. Default is 'basic'.

    Return
    ------
    C : numpy.ndarray
        The covariance matrix.
    """
    if cov_type == "basic":
        C = Y @ Y.T
    elif cov_type == "SSM":
        n_time = Y.shape[1]
        M_Y = Y.T @ Y
        YY = M_Y + 0.001 * (50 / n_time) * np.trace(M_Y) * np.eye(n_time)
        P_Y = (Y @ np.linalg.inv(YY)) @ Y.T
        C = P_Y.T @ P_Y
    elif cov_type == "riemann":
        msg = "Riemannian covariance is not yet implemented as a standalone function."
        raise NotImplementedError(msg)
    else:
        msg = "Covariance type not recognized. Use 'basic', 'SSM' or provide a custom covariance matrix."
        raise ValueError(msg)

    return C

invert.simulate.gen_correlated_sources

gen_correlated_sources(corr_coeff, T, Q)

Generate Q correlated sources with a specified correlation coefficient. The sources are generated as sinusoids with random frequencies and phases.

Parameters:

Name Type Description Default
corr_coeff float

The correlation coefficient between the sources.

required
T int

The number of time points in the sources.

required
Q int

The number of sources to generate.

required

Returns:

Name Type Description
Y ndarray

The generated sources.

Source code in invert/simulate/covariance.py
def gen_correlated_sources(corr_coeff, T, Q):
    """Generate Q correlated sources with a specified correlation coefficient.
    The sources are generated as sinusoids with random frequencies and phases.

    Parameters
    ----------
    corr_coeff : float
        The correlation coefficient between the sources.
    T : int
        The number of time points in the sources.
    Q : int
        The number of sources to generate.

    Returns
    -------
    Y : numpy.ndarray
        The generated sources.
    """
    Cov = np.ones((Q, Q)) * corr_coeff + np.diag(
        np.ones(Q) * (1 - corr_coeff)
    )  # required covariance matrix
    freq = np.random.randint(10, 31, Q)  # random frequencies between 10Hz to 30Hz

    phases = 2 * np.pi * np.random.rand(Q)  # random phases
    t = np.linspace(10 * np.pi / T, 10 * np.pi, T)
    Signals = np.sqrt(2) * np.cos(
        2 * np.pi * freq[:, None] * t + phases[:, None]
    )  # the basic signals

    if corr_coeff < 1:
        A = np.linalg.cholesky(Cov).T  # cholesky Decomposition
        Y = A @ Signals
    else:  # Coherent Sources
        Y = np.tile(Signals[0, :], (Q, 1))

    return Y