Skip to content

Block-Champagne

Solver ID: BLOCK-CHAMPAGNE

Usage

from invert import Solver

# fwd = ...    (mne.Forward object)
# evoked = ... (mne.Evoked object)

solver = Solver("BLOCK-CHAMPAGNE")
solver.make_inverse_operator(fwd)
stc = solver.apply_inverse_operator(evoked)
stc.plot()

Overview

Block-Champagne sparse Bayesian learning with overlapping local blocks (local homogeneity prior) and diagonal residual noise learning.

References

  1. Feng, Z., Guan, C., & Sun, Y. (2025). Block-Champagne: A Novel Bayesian Framework for Imaging Extended E/MEG Source. IEEE Transactions on Medical Imaging.

API Reference

Bases: BaseSolver

Block-Champagne: Champagne with local overlapping blocks and noise learning.

Source code in invert/solvers/bayesian/block_champagne.py
class SolverBlockChampagne(BaseSolver):
    """Block-Champagne: Champagne with local overlapping blocks and noise learning."""

    meta = SolverMeta(
        slug="block-champagne",
        full_name="Block-Champagne",
        category="Bayesian",
        description=(
            "Block-Champagne sparse Bayesian learning with overlapping local blocks "
            "(local homogeneity prior) and diagonal residual noise learning."
        ),
        references=[
            "Feng, Z., Guan, C., & Sun, Y. (2025). Block-Champagne: A Novel Bayesian Framework for Imaging Extended E/MEG Source. IEEE Transactions on Medical Imaging.",
        ],
    )

    def __init__(
        self,
        name: str = "BlockChampagne",
        *,
        adjacency_type: str = "spatial",
        adjacency_distance: float = 3e-3,
        r: float = 0.95,
        augment_noise_activity: bool = False,
        v_noise_activity: float = 1.0,
        max_iter: int = 500,
        pruning_thresh: float = 1e-3,
        convergence_criterion: float = 1e-8,
        damping: float = 0.0,
        eps: float = 1e-15,
        rank_tol: float = 1e-12,
        **kwargs,
    ) -> None:
        self.name = str(name)
        self.adjacency_type = str(adjacency_type)
        self.adjacency_distance = float(adjacency_distance)
        self.r = float(r)
        self.augment_noise_activity = bool(augment_noise_activity)
        self.v_noise_activity = float(v_noise_activity)
        self.max_iter = int(max_iter)
        self.pruning_thresh = float(pruning_thresh)
        self.convergence_criterion = float(convergence_criterion)
        self.damping = float(damping)
        self.eps = float(eps)
        self.rank_tol = float(rank_tol)

        self._block_geometry: _BlockGeometry | None = None
        super().__init__(**kwargs)

    # ------------------------------------------------------------------
    # Public API
    # ------------------------------------------------------------------
    def make_inverse_operator(
        self,
        forward,
        mne_obj=None,
        *args,
        alpha: str | float = "auto",
        noise_cov: mne.Covariance | None = None,
        **kwargs,
    ):
        super().make_inverse_operator(forward, mne_obj, *args, alpha=alpha, **kwargs)

        # SSP projection only (no whitening). Block-Champagne learns a diagonal
        # residual noise covariance internally (Eq. 18).
        wf = self.prepare_whitened_forward(None)
        self._sensor_projector = wf.projector

        data = self.unpack_data_obj(mne_obj)
        data_proj = wf.sensor_transform @ np.asarray(data, dtype=float)
        leadfield_proj = np.asarray(wf.G_white, dtype=float)

        # Data centering helps identifiability for noise learning variants.
        data_proj = data_proj - data_proj.mean(axis=1, keepdims=True)

        data_cov = self.data_covariance(data_proj, center=True, ddof=1)
        # Block-Champagne learns diagonal noise (beta) internally via
        # Convexity/sqrt updates, converging to the same solution regardless
        # of the initial noise level.  A single alpha avoids a wasted grid
        # search and spurious edge-of-grid GCV warnings.
        if self.alpha == "auto":
            n_chans = data_proj.shape[0]
            self.alphas = [float(np.trace(data_cov) / n_chans)]
        else:
            self.get_alphas(reference=data_cov)

        geom = self._prepare_blocks(n_sources=leadfield_proj.shape[1])
        # U = L @ H.T, where each u_i is the sum of leadfield columns in block(i).
        # Compute once per make_inverse_operator and reuse across alphas.
        U = (geom.H @ leadfield_proj.T).T  # (n_chans, n_sources)

        inverse_operators: list[np.ndarray] = []
        for a in self.alphas:
            W_proj = self._fit_block_champagne(
                B=data_proj,
                L=leadfield_proj,
                U=U,
                geom=geom,
                alpha=float(a),
            )
            inverse_operators.append(W_proj @ wf.sensor_transform)

        self.inverse_operators = [
            InverseOperator(op, self.name) for op in inverse_operators
        ]
        return self

    # ------------------------------------------------------------------
    # Block geometry
    # ------------------------------------------------------------------
    def _prepare_blocks(self, *, n_sources: int) -> _BlockGeometry:
        if self._block_geometry is not None:
            if self._block_geometry.H.shape[0] == int(n_sources):
                return self._block_geometry

        adjacency = build_source_adjacency(
            self.forward["src"],
            adjacency_type=self.adjacency_type,
            adjacency_distance=self.adjacency_distance,
            verbose=0,
        ).tocsr()
        if adjacency.shape[0] != int(n_sources):
            raise ValueError(
                f"Adjacency shape {adjacency.shape} does not match n_sources={n_sources}."
            )

        block_sizes = np.empty(n_sources, dtype=int)
        blocks: list[np.ndarray] = []
        for i in range(n_sources):
            start, end = adjacency.indptr[i], adjacency.indptr[i + 1]
            neigh = adjacency.indices[start:end].astype(int, copy=False)
            block = np.unique(np.concatenate([np.array([i], dtype=int), neigh]))
            blocks.append(block)
            block_sizes[i] = int(block.size)

        # Build neighborhood incidence matrix H (rows: blocks i, cols: voxels j).
        # H[i, j] = 1 if j is in block(i).
        rows: list[int] = []
        cols: list[int] = []
        for i, block in enumerate(blocks):
            rows.extend([i] * int(block.size))
            cols.extend(block.tolist())
        data = np.ones(len(rows), dtype=float)
        H = csr_matrix(
            (data, (np.asarray(rows), np.asarray(cols))), shape=(n_sources, n_sources)
        )
        H.sum_duplicates()
        H.eliminate_zeros()
        H_T = H.T.tocsc()

        geom = _BlockGeometry(H=H, H_T=H_T, block_sizes=block_sizes)
        self._block_geometry = geom
        return geom

    # ------------------------------------------------------------------
    # Core loop (Eq. 10, 16, 18)
    # ------------------------------------------------------------------
    def _fit_block_champagne(
        self,
        *,
        B: np.ndarray,  # (n_chans, n_times)
        L: np.ndarray,  # (n_chans, n_sources)
        U: np.ndarray,  # (n_chans, n_sources) where U[:, i] = sum_{j in block(i)} L[:, j]
        geom: _BlockGeometry,
        alpha: float,
    ) -> np.ndarray:
        n_chans, _n_times = B.shape
        n_sources = L.shape[1]
        if int(geom.H.shape[0]) != int(n_sources):
            raise ValueError("Block geometry does not match leadfield n_sources.")
        if U.shape != (n_chans, n_sources):
            raise ValueError(f"U has shape {U.shape}, expected {(n_chans, n_sources)}")

        # Initialize hyperparameters
        gamma_blocks = np.ones(n_sources, dtype=float)
        beta = np.full(
            n_chans, max(alpha, self.eps), dtype=float
        )  # residual diag noise
        v = float(max(self.v_noise_activity, self.eps))

        n_times = int(B.shape[1])
        eye_ch = np.eye(n_chans, dtype=float)
        loss_prev: float | None = None
        best_state: tuple[float, np.ndarray, np.ndarray] | None = None

        # Preallocate scaled matrices to avoid per-iteration allocations.
        Lw = np.empty_like(L)
        Ug = np.empty_like(U)

        for i_iter in range(self.max_iter):
            # ------------------------------------------------------------------
            # Model covariance Sigma_b (Eq. 10), computed via the exact algebraic
            # simplification described in the module docstring:
            #   Sigma_sources = (1-r) L diag(w) L^T  +  r U diag(gamma) U^T
            # where w = H.T @ gamma and U[:, i] = sum_{j in block(i)} L[:, j].
            # ------------------------------------------------------------------
            w = np.asarray(geom.H_T @ gamma_blocks, dtype=float).ravel()  # (n_sources,)
            w = np.maximum(w, 0.0)

            np.multiply(L, w[np.newaxis], out=Lw)
            Sigma_b = (1.0 - self.r) * (Lw @ L.T)

            np.multiply(U, gamma_blocks[np.newaxis], out=Ug)
            Sigma_b = Sigma_b + self.r * (Ug @ U.T)

            Sigma_b = Sigma_b + np.diag(beta)
            if self.augment_noise_activity:
                Sigma_b = Sigma_b + v * eye_ch
            Sigma_b = 0.5 * (Sigma_b + Sigma_b.T)

            # Invert (with rank-tolerant fallback)
            try:
                Sigma_b_inv = np.linalg.inv(Sigma_b)
            except np.linalg.LinAlgError:
                Sigma_b_inv = np.linalg.pinv(Sigma_b, rcond=self.rank_tol)

            # Posterior mean (Eq. 10) for hatZ (and optionally E)
            SiB = Sigma_b_inv @ B  # (n_chans, n_times)
            # We do not explicitly form hatZ. For the diagonal-noise update we only
            # need the posterior mean residual:
            #   residual = B - L * E[S] = Sigma_n * Sigma_b^{-1} B = diag(beta) * SiB.
            # This identity follows from Sigma_b = L Cov(S) L^T + Sigma_n (+ vI).
            residual = beta[:, np.newaxis] * SiB

            # Loss (Eq. 11)
            data_fit = float(np.trace(B.T @ SiB) / n_times)
            log_det = _safe_slogdet_sym(Sigma_b)
            loss = data_fit + log_det

            if best_state is None or loss < best_state[0]:
                best_state = (loss, gamma_blocks.copy(), beta.copy())

            # Convergence check
            if loss_prev is not None:
                rel_change = (loss_prev - loss) / (abs(loss_prev) + 1e-20)
                if rel_change > 0 and rel_change < self.convergence_criterion:
                    loss_prev = loss
                    break
            loss_prev = loss

            # ------------------------------------------------------------------
            # Gamma updates (Eq. 16) without forming hatZ explicitly.
            #
            # For block i, let A_i be the leadfield restricted to block(i) columns.
            # The convexity/MM update needs:
            #   upper_i = (1/T) ||mu_hatZ_i||_F^2
            #   lower_i = trace(L_i^T Sigma_b^{-1} L_i)
            # with L_i = A_i M_i and M_i M_i^T = C_i(r) = (1-r)I + r 11^T.
            #
            # Using C_i directly yields (exactly):
            #   lower_i = (1-r) * sum_{j in block(i)} z_j  +  r * u_i^T Sigma_b^{-1} u_i
            #   upper_i = gamma_i^2 * [ (1-r) * sum_{j in block(i)} t_j
            #                          +  r * u_i^T (SiB SiB^T) u_i ] / T
            # where:
            #   z_j = l_j^T Sigma_b^{-1} l_j              (diag of Fisher info)
            #   t_j = ||l_j^T Sigma_b^{-1} B||_2^2        (time-energy in SiB space)
            #   u_i = sum_{j in block(i)} l_j             (U column i)
            # ------------------------------------------------------------------
            SiL = Sigma_b_inv @ L  # (n_chans, n_sources)
            z = np.sum(L * SiL, axis=0)  # (n_sources,)

            # SiB SiB^T is small (n_chans x n_chans) and reused several times.
            G = SiB @ SiB.T  # (n_chans, n_chans)
            GL = G @ L
            t = np.sum(L * GL, axis=0)  # (n_sources,) == diag(L.T G L)

            # Neighborhood sums: (H @ z)[i] = sum_{j in block(i)} z_j.
            z_sum = np.asarray(geom.H @ z, dtype=float).ravel()
            t_sum = np.asarray(geom.H @ t, dtype=float).ravel()

            # u_i^T Sigma_b^{-1} u_i and u_i^T G u_i for all i via diagonal of U.T M U.
            SiU = Sigma_b_inv @ U
            uSu = np.sum(U * SiU, axis=0)
            GU = G @ U
            uGu = np.sum(U * GU, axis=0)

            upper = (gamma_blocks**2) * (
                (1.0 - self.r) * (t_sum / max(n_times, 1))
                + self.r * (uGu / max(n_times, 1))
            )
            lower = (1.0 - self.r) * z_sum + self.r * uSu
            lower = np.maximum(lower, self.eps)
            new_gamma_blocks = np.sqrt(np.maximum(upper, 0.0) / lower)

            # Damping (optional): gamma <- (1-d)*new + d*old
            if self.damping > 0:
                d = float(np.clip(self.damping, 0.0, 0.99))
                new_gamma_blocks = (1.0 - d) * new_gamma_blocks + d * gamma_blocks

            new_gamma_blocks[~np.isfinite(new_gamma_blocks)] = 0.0
            new_gamma_blocks = np.maximum(new_gamma_blocks, 0.0)

            # Pruning (block-level)
            if float(new_gamma_blocks.max(initial=0.0)) > 0.0:
                thresh = self.pruning_thresh * float(new_gamma_blocks.max())
                new_gamma_blocks[new_gamma_blocks < thresh] = 0.0

            gamma_blocks = new_gamma_blocks

            # Residual for noise update (Eq. 18)
            q_hat = np.diag(Sigma_b_inv)  # (n_chans,)
            numer = np.mean(residual**2, axis=1)
            denom = np.maximum(q_hat, self.eps)
            beta_new = np.sqrt(np.maximum(numer, 0.0) / denom)
            beta_new[~np.isfinite(beta_new)] = float(np.max(beta))
            beta = np.maximum(beta_new, self.eps)

            if self.verbose > 1:
                logger.debug(
                    "iter %d: loss=%.4e, nnz_gamma=%d, beta=[%.3e..%.3e]",
                    i_iter,
                    loss,
                    int(np.count_nonzero(gamma_blocks)),
                    float(beta.min()),
                    float(beta.max()),
                )

        # If the loop ended in a worse state (e.g., numerical issues), restore best.
        if best_state is not None:
            _best_loss, gamma_blocks, beta = best_state

        # Final operator W_proj mapping sensors -> cortex sources:
        #   W = CovS L^T Sigma_b^{-1}
        # with CovS = (1-r) diag(w) + r H.T diag(gamma) H  (exact for constant-corr blocks).
        w = np.asarray(geom.H_T @ gamma_blocks, dtype=float).ravel()
        w = np.maximum(w, 0.0)

        np.multiply(L, w[np.newaxis], out=Lw)
        Sigma_b = (1.0 - self.r) * (Lw @ L.T)
        np.multiply(U, gamma_blocks[np.newaxis], out=Ug)
        Sigma_b = Sigma_b + self.r * (Ug @ U.T)
        Sigma_b = Sigma_b + np.diag(beta)
        if self.augment_noise_activity:
            Sigma_b = Sigma_b + v * eye_ch
        Sigma_b = 0.5 * (Sigma_b + Sigma_b.T)
        try:
            Sigma_b_inv = np.linalg.inv(Sigma_b)
        except np.linalg.LinAlgError:
            Sigma_b_inv = np.linalg.pinv(Sigma_b, rcond=self.rank_tol)

        A = (Sigma_b_inv @ L).T  # (n_sources, n_chans) == L^T Sigma_b^{-1}

        # W1 = (1-r) diag(w) A
        W1 = (1.0 - self.r) * (w[:, np.newaxis] * A)

        # W2 = r H.T diag(gamma) H A
        HA = geom.H @ A  # (n_sources, n_chans)
        gamma_HA = gamma_blocks[:, np.newaxis] * HA
        W2 = self.r * (geom.H_T @ gamma_HA)

        W_cortex = (W1 + W2).astype(float, copy=False)
        return W_cortex

__init__

__init__(
    name: str = "BlockChampagne",
    *,
    adjacency_type: str = "spatial",
    adjacency_distance: float = 0.003,
    r: float = 0.95,
    augment_noise_activity: bool = False,
    v_noise_activity: float = 1.0,
    max_iter: int = 500,
    pruning_thresh: float = 0.001,
    convergence_criterion: float = 1e-08,
    damping: float = 0.0,
    eps: float = 1e-15,
    rank_tol: float = 1e-12,
    **kwargs,
) -> None
Source code in invert/solvers/bayesian/block_champagne.py
def __init__(
    self,
    name: str = "BlockChampagne",
    *,
    adjacency_type: str = "spatial",
    adjacency_distance: float = 3e-3,
    r: float = 0.95,
    augment_noise_activity: bool = False,
    v_noise_activity: float = 1.0,
    max_iter: int = 500,
    pruning_thresh: float = 1e-3,
    convergence_criterion: float = 1e-8,
    damping: float = 0.0,
    eps: float = 1e-15,
    rank_tol: float = 1e-12,
    **kwargs,
) -> None:
    self.name = str(name)
    self.adjacency_type = str(adjacency_type)
    self.adjacency_distance = float(adjacency_distance)
    self.r = float(r)
    self.augment_noise_activity = bool(augment_noise_activity)
    self.v_noise_activity = float(v_noise_activity)
    self.max_iter = int(max_iter)
    self.pruning_thresh = float(pruning_thresh)
    self.convergence_criterion = float(convergence_criterion)
    self.damping = float(damping)
    self.eps = float(eps)
    self.rank_tol = float(rank_tol)

    self._block_geometry: _BlockGeometry | None = None
    super().__init__(**kwargs)

make_inverse_operator

make_inverse_operator(
    forward,
    mne_obj=None,
    *args,
    alpha: str | float = "auto",
    noise_cov: Covariance | None = None,
    **kwargs,
)
Source code in invert/solvers/bayesian/block_champagne.py
def make_inverse_operator(
    self,
    forward,
    mne_obj=None,
    *args,
    alpha: str | float = "auto",
    noise_cov: mne.Covariance | None = None,
    **kwargs,
):
    super().make_inverse_operator(forward, mne_obj, *args, alpha=alpha, **kwargs)

    # SSP projection only (no whitening). Block-Champagne learns a diagonal
    # residual noise covariance internally (Eq. 18).
    wf = self.prepare_whitened_forward(None)
    self._sensor_projector = wf.projector

    data = self.unpack_data_obj(mne_obj)
    data_proj = wf.sensor_transform @ np.asarray(data, dtype=float)
    leadfield_proj = np.asarray(wf.G_white, dtype=float)

    # Data centering helps identifiability for noise learning variants.
    data_proj = data_proj - data_proj.mean(axis=1, keepdims=True)

    data_cov = self.data_covariance(data_proj, center=True, ddof=1)
    # Block-Champagne learns diagonal noise (beta) internally via
    # Convexity/sqrt updates, converging to the same solution regardless
    # of the initial noise level.  A single alpha avoids a wasted grid
    # search and spurious edge-of-grid GCV warnings.
    if self.alpha == "auto":
        n_chans = data_proj.shape[0]
        self.alphas = [float(np.trace(data_cov) / n_chans)]
    else:
        self.get_alphas(reference=data_cov)

    geom = self._prepare_blocks(n_sources=leadfield_proj.shape[1])
    # U = L @ H.T, where each u_i is the sum of leadfield columns in block(i).
    # Compute once per make_inverse_operator and reuse across alphas.
    U = (geom.H @ leadfield_proj.T).T  # (n_chans, n_sources)

    inverse_operators: list[np.ndarray] = []
    for a in self.alphas:
        W_proj = self._fit_block_champagne(
            B=data_proj,
            L=leadfield_proj,
            U=U,
            geom=geom,
            alpha=float(a),
        )
        inverse_operators.append(W_proj @ wf.sensor_transform)

    self.inverse_operators = [
        InverseOperator(op, self.name) for op in inverse_operators
    ]
    return self