Skip to content

Adaptive Patch Source Estimation

Solver ID: APSE

Usage

from invert import Solver

# fwd = ...    (mne.Forward object)
# evoked = ... (mne.Evoked object)

solver = Solver("APSE")
solver.make_inverse_operator(fwd)
stc = solver.apply_inverse_operator(evoked)
stc.plot()

Overview

Hybrid patch-based inverse solver combining subspace source-number estimation, beamforming-style spatial filtering, and sparse Bayesian updates to estimate patch-sized cortical activity.

References

  1. Lukas Hecker 2025, unpublished

API Reference

Bases: BaseSolver

Adaptive Patch Source Estimation (APSE) - A hybrid inverse solution technique that combines the strengths of beamformers, subspace methods, and sparse Bayesian approaches for patch-sized source reconstruction.

This solver: 1. Uses subspace decomposition to estimate the number of active sources 2. Applies LCMV-like beamforming with patch constraints 3. Employs sparse Bayesian learning for source amplitude estimation 4. Automatically determines patch sizes based on source clustering

References

[1] Van Veen, B. D., & Buckley, K. M. (1988). Beamforming: A versatile approach to spatial filtering. IEEE assp magazine, 5(2), 4-24. [2] Wipf, D., & Nagarajan, S. (2009). A unified Bayesian framework for MEG/EEG source imaging. NeuroImage, 44(3), 947-966. [3] Mosher, J. C., & Leahy, R. M. (1999). Source localization using recursively applied and projected (RAP) MUSIC. IEEE TSP, 47(2), 332-340.

Source code in invert/solvers/hybrids/apse.py
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
class SolverAPSE(BaseSolver):
    """
    Adaptive Patch Source Estimation (APSE) - A hybrid inverse solution technique
    that combines the strengths of beamformers, subspace methods, and sparse
    Bayesian approaches for patch-sized source reconstruction.

    This solver:
    1. Uses subspace decomposition to estimate the number of active sources
    2. Applies LCMV-like beamforming with patch constraints
    3. Employs sparse Bayesian learning for source amplitude estimation
    4. Automatically determines patch sizes based on source clustering

    References
    ----------
    [1] Van Veen, B. D., & Buckley, K. M. (1988). Beamforming: A versatile
        approach to spatial filtering. IEEE assp magazine, 5(2), 4-24.
    [2] Wipf, D., & Nagarajan, S. (2009). A unified Bayesian framework for MEG/EEG
        source imaging. NeuroImage, 44(3), 947-966.
    [3] Mosher, J. C., & Leahy, R. M. (1999). Source localization using recursively
        applied and projected (RAP) MUSIC. IEEE TSP, 47(2), 332-340.
    """

    meta = SolverMeta(
        acronym="APSE",
        full_name="Adaptive Patch Source Estimation",
        category="Hybrid",
        description=(
            "Hybrid patch-based inverse solver combining subspace source-number estimation, "
            "beamforming-style spatial filtering, and sparse Bayesian updates to estimate "
            "patch-sized cortical activity."
        ),
        references=[
            "Lukas Hecker 2025, unpublished",
        ],
    )

    def __init__(
        self,
        name="Adaptive Patch Source Estimation",
        n_patches="auto",
        patch_size_percentile=30,
        sparsity_param=0.3,
        max_patch_size=50,
        min_patch_size=10,
        reduce_rank=True,
        rank="auto",
        **kwargs,
    ):
        self.name = name
        self.n_patches = n_patches
        self.patch_size_percentile = patch_size_percentile
        self.sparsity_param = sparsity_param
        self.max_patch_size = max_patch_size
        self.min_patch_size = min_patch_size
        return super().__init__(reduce_rank=reduce_rank, rank=rank, **kwargs)

    def make_inverse_operator(
        self,
        forward,
        mne_obj,
        *args,
        alpha="auto",
        max_iter=100,
        convergence_tol=1e-4,
        verbose=0,
        **kwargs,
    ):
        """
        Calculate the APSE inverse operator.

        Parameters
        ----------
        forward : mne.Forward
            The mne-python Forward model instance
        mne_obj : [mne.Evoked, mne.Epochs, mne.io.Raw]
            The MNE data object
        alpha : float or 'auto'
            The regularization parameter
        max_iter : int
            Maximum number of iterations for the Bayesian updates
        convergence_tol : float
            Convergence tolerance for the iterative algorithm
        verbose : int
            Verbosity level

        Return
        ------
        self : object
            Returns itself for convenience
        """
        super().make_inverse_operator(forward, *args, alpha=alpha, **kwargs)

        # Get data and leadfield
        data = self.unpack_data_obj(mne_obj)
        leadfield = self.leadfield
        n_chans, n_dipoles = leadfield.shape

        # Regularization scale should match the sensor-space covariance matrices
        # that are diagonal-loaded inside this solver (Sigma_y + alpha * I).
        data_cov = self.data_covariance(data, center=True, ddof=1)
        self.get_alphas(reference=data_cov)

        # Normalize leadfield columns
        leadfield_norm = leadfield / np.linalg.norm(leadfield, axis=0)

        # Step 1: Estimate number of sources using subspace decomposition
        n_sources = self._estimate_n_sources(data, leadfield_norm)
        if verbose > 0:
            logger.info(f"Estimated number of active sources: {n_sources}")

        # Step 2: Get source positions
        pos = pos_from_forward(self.forward)

        # Step 3: Create patches based on spatial proximity
        patches = self._create_adaptive_patches(
            data, leadfield_norm, pos, n_sources, verbose
        )
        self.patches = patches

        # Step 4: Create inverse operators for different regularization values
        inverse_operators = []

        for alpha in self.alphas:
            # Compute patch-constrained inverse operator
            inverse_op = self._compute_patch_inverse(
                data, leadfield, patches, alpha, max_iter, convergence_tol, verbose
            )
            inverse_operators.append(inverse_op)

        self.inverse_operators = [
            InverseOperator(op, self.name) for op in inverse_operators
        ]
        return self

    def _estimate_n_sources(self, data, leadfield):
        """
        Estimate the number of active sources using eigenvalue decomposition
        of the data covariance matrix.

        This implementation is specialized for patch-based source estimation
        and uses more conservative thresholds than the standard method.
        """
        if self.n_patches != "auto":
            return self.n_patches

        # Compute data covariance
        C = data @ data.T

        # SVD decomposition
        _, S, _ = np.linalg.svd(C, full_matrices=False)

        # Normalize eigenvalues
        S_norm = S / S.max()

        # Find elbow using multiple criteria
        # Criterion 1: Eigenvalue drop-off - more sensitive for patch sources
        diff_S = np.abs(np.diff(S_norm))
        drop_idx = np.where(diff_S < 0.02)[0]  # Slightly higher threshold for patches
        n_drop = drop_idx[0] + 1 if len(drop_idx) > 0 else len(S)

        # Criterion 2: L-curve method using base class method
        np.arange(min(len(S), 15))  # Limit to first 15 components for patches
        n_lcurve = self.get_comps_L(S[:15])

        # Criterion 3: Cumulative energy - lower threshold for patches
        cumsum_S = np.cumsum(S) / np.sum(S)
        n_energy = np.where(cumsum_S > 0.90)[0][0] + 1  # 90% instead of 95%

        # Combine criteria with conservative bias toward fewer sources for patches
        estimates = [n_drop, n_lcurve, n_energy]
        n_sources = int(
            np.percentile(estimates, 33)
        )  # Use 33rd percentile instead of median

        # Ensure reasonable bounds
        n_sources = np.clip(n_sources, 1, min(20, len(S) // 2))

        return n_sources

    def _create_adaptive_patches(self, data, leadfield, pos, n_sources, verbose):
        """
        Create adaptive patches based on beamformer peaks and spatial clustering.
        """
        n_chans, n_dipoles = leadfield.shape

        # Step 1: Compute initial source activity using LCMV-like beamformer
        C = data @ data.T
        reg = np.trace(C) / n_chans * 0.01  # Small regularization
        C_reg = C + reg * np.eye(n_chans)
        C_inv = np.linalg.inv(C_reg)

        # Beamformer spatial filter for each dipole
        source_power = np.zeros(n_dipoles)
        for i in range(n_dipoles):
            l = leadfield[:, i : i + 1]
            w = C_inv @ l / (l.T @ C_inv @ l)
            source_power[i] = np.real(w.T @ C @ w).item()

        # Step 2: Find peaks using adaptive thresholding
        # Normalize source power
        source_power_norm = source_power / source_power.max()

        # Dynamic threshold based on noise floor estimation - more aggressive for patches
        sorted_power = np.sort(source_power_norm)
        noise_floor = np.median(
            sorted_power[: len(sorted_power) // 3]
        )  # Use lower third instead of half
        threshold = noise_floor + 0.2 * (
            1 - noise_floor
        )  # Lower threshold for patch detection

        # Find peaks
        peak_indices = self._find_adaptive_peaks(
            source_power_norm, threshold, pos, n_sources
        )

        if verbose > 0:
            logger.info(f"Found {len(peak_indices)} peak locations")

        # Step 3: Create patches around peaks
        patches = self._grow_patches(peak_indices, pos, source_power_norm)

        return patches

    def _find_adaptive_peaks(self, source_power, threshold, pos, n_sources):
        """
        Find peak locations with adaptive spatial separation.
        """
        # Start with high threshold and decrease if needed
        peaks = []
        current_threshold = threshold

        while len(peaks) < n_sources and current_threshold > 0.1:
            candidates = np.where(source_power > current_threshold)[0]

            if len(candidates) == 0:
                current_threshold *= 0.8
                continue

            # Sort by power
            candidates = candidates[np.argsort(source_power[candidates])[::-1]]

            # Select peaks with minimum spatial separation
            min_distance = self._estimate_min_distance(pos)
            peaks = []

            for cand in candidates:
                # Check distance to existing peaks
                if len(peaks) == 0:
                    peaks.append(cand)
                else:
                    distances = cdist(pos[cand : cand + 1], pos[peaks])
                    if np.min(distances) > min_distance:
                        peaks.append(cand)

                if len(peaks) >= n_sources:
                    break

            current_threshold *= 0.8

        return np.array(peaks[:n_sources])

    def _estimate_min_distance(self, pos):
        """
        Estimate minimum distance between sources based on spatial distribution.
        """
        # Sample random pairs to estimate typical distances
        n_samples = min(1000, len(pos))
        indices = np.random.choice(len(pos), n_samples, replace=False)
        sample_pos = pos[indices]

        # Compute pairwise distances
        distances = cdist(sample_pos, sample_pos)
        distances[distances == 0] = np.inf  # Ignore self-distances

        # Use percentile of nearest neighbor distances
        min_distances = np.min(distances, axis=1)
        min_distance = np.percentile(min_distances, self.patch_size_percentile)

        return (
            min_distance * 1.5
        )  # Reduced scaling for patch sources (allow closer patches)

    def _grow_patches(self, peak_indices, pos, source_power):
        """
        Grow patches around peak locations based on spatial proximity and power.
        """
        patches = []
        used_vertices = set()

        # Estimate adaptive patch size with spatial correlation awareness
        patch_dists = []
        for peak in peak_indices:
            distances = cdist(pos[peak : peak + 1], pos).flatten()
            # Use larger percentile and add correlation-based scaling
            base_dist = np.percentile(
                distances[distances > 0], self.patch_size_percentile
            )
            # Scale based on local source power gradient for better patch extent
            local_indices = np.where(distances <= base_dist * 2)[0]
            if len(local_indices) > 1:
                local_power_std = np.std(source_power[local_indices])
                # Higher variability suggests larger patch needed
                scale_factor = 1.0 + 0.5 * local_power_std
                patch_dists.append(base_dist * scale_factor)
            else:
                patch_dists.append(base_dist)

        # Create patches
        for i, peak in enumerate(peak_indices):
            if peak in used_vertices:
                continue

            # Find vertices within adaptive distance
            distances = cdist(pos[peak : peak + 1], pos).flatten()
            patch_vertices = np.where(distances <= patch_dists[i])[0]

            # Apply power-based refinement with spatial weighting for patch sources
            patch_power = source_power[patch_vertices]
            patch_distances = distances[patch_vertices]

            # Use distance-weighted threshold that's more lenient for nearby vertices
            distance_weights = np.exp(-patch_distances / (patch_dists[i] * 0.5))
            adaptive_threshold = 0.05 * source_power[peak] * (1 + distance_weights)
            patch_vertices = patch_vertices[patch_power > adaptive_threshold]

            # Ensure patch size constraints
            if len(patch_vertices) > self.max_patch_size:
                # Keep closest vertices
                patch_distances = distances[patch_vertices]
                sorted_idx = np.argsort(patch_distances)
                patch_vertices = patch_vertices[sorted_idx[: self.max_patch_size]]
            elif len(patch_vertices) < self.min_patch_size:
                # Expand to include nearest neighbors
                sorted_idx = np.argsort(distances)
                patch_vertices = sorted_idx[: self.min_patch_size]

            # Mark vertices as used
            used_vertices.update(patch_vertices)
            patches.append(patch_vertices)

        return patches

    def _compute_patch_inverse(
        self, data, leadfield, patches, alpha, max_iter, convergence_tol, verbose
    ):
        """
        Compute the patch-constrained inverse operator using iterative
        sparse Bayesian learning.
        """
        n_chans, n_dipoles = leadfield.shape
        n_time = data.shape[1]

        # Initialize patch leadfields
        patch_leadfields = []
        patch_indices = []

        for patch in patches:
            if len(patch) > 0:
                patch_leadfield = leadfield[:, patch]
                patch_leadfields.append(patch_leadfield)
                patch_indices.extend(patch.tolist())

        # Construct combined patch leadfield
        L_patch = (
            np.hstack(patch_leadfields) if patch_leadfields else np.zeros((n_chans, 1))
        )
        n_patch_sources = L_patch.shape[1]

        # Initialize hyperparameters with better noise estimation
        gamma = np.ones(n_patch_sources)
        # Improved noise variance estimation based on residual after simple beamforming
        C = data @ data.T / n_time
        signal_subspace = min(10, n_chans // 2)  # Estimate signal subspace size
        eigenvals = np.linalg.eigvals(C)
        eigenvals = np.sort(eigenvals)[::-1]  # Sort descending
        # Noise variance from smallest eigenvalues
        noise_var = (
            np.mean(eigenvals[signal_subspace:])
            if len(eigenvals) > signal_subspace
            else eigenvals[-1] * 0.1
        )

        # Iterative sparse Bayesian learning
        for iteration in range(max_iter):
            gamma_old = gamma.copy()

            # E-step: Compute posterior mean and covariance
            Gamma = np.diag(gamma)
            Sigma_y = L_patch @ Gamma @ L_patch.T + noise_var * np.eye(n_chans)

            try:
                Sigma_y_inv = np.linalg.inv(Sigma_y + alpha * np.eye(n_chans))
            except np.linalg.LinAlgError:
                # Add more regularization if singular
                Sigma_y_inv = np.linalg.inv(Sigma_y + (alpha + 0.1) * np.eye(n_chans))

            # Posterior covariance
            Sigma_post = Gamma - Gamma @ L_patch.T @ Sigma_y_inv @ L_patch @ Gamma

            # Posterior mean
            mu_post = Gamma @ L_patch.T @ Sigma_y_inv @ data

            # M-step: Update hyperparameters with patch-aware updates
            for j in range(n_patch_sources):
                data_term = np.real(
                    np.linalg.norm(mu_post[j, :]) ** 2 / n_time + Sigma_post[j, j]
                )
                gamma[j] = (
                    1 - self.sparsity_param
                ) * data_term + self.sparsity_param * gamma[j]

            # Update noise variance with regularization to prevent over-fitting
            residual = data - L_patch @ mu_post
            residual_var = np.trace(residual @ residual.T) / (n_chans * n_time)
            # Blend with previous estimate for stability
            noise_var = 0.8 * noise_var + 0.2 * residual_var
            # Ensure minimum noise level
            noise_var = max(noise_var, 1e-6)

            # Check convergence
            if (
                np.linalg.norm(gamma - gamma_old) / np.linalg.norm(gamma_old)
                < convergence_tol
            ):
                if verbose > 0:
                    logger.info(f"Converged after {iteration + 1} iterations")
                break

        # Compute final inverse operator
        Gamma = np.diag(gamma)
        Sigma_y = L_patch @ Gamma @ L_patch.T + noise_var * np.eye(n_chans)
        Sigma_y_inv = np.linalg.inv(Sigma_y + alpha * np.eye(n_chans))

        # Create sparse inverse operator
        W_patch = np.real(Gamma @ L_patch.T @ Sigma_y_inv)

        # Map back to full source space with continuous spatial interpolation
        W = np.zeros((n_dipoles, n_chans))
        if len(patch_indices) > 0:
            W[patch_indices, :] = W_patch

        # Apply continuous spatial smoothing instead of discrete patch smoothing
        W_smooth = self._apply_continuous_spatial_smoothing(
            W, patches, pos_from_forward(self.forward)
        )

        return W_smooth

    def _smooth_within_patches(self, W, patches):
        """
        Apply smoothing within each patch to ensure coherent patch activations.
        """
        W_smooth = W.copy()

        for patch in patches:
            if len(patch) > 1:
                # Enhanced spatial smoothing for patch coherence
                patch_weights = W[patch, :]

                # Use power-weighted averaging for better patch coherence
                patch_norms = np.linalg.norm(patch_weights, axis=1)
                if np.sum(patch_norms) > 0:
                    weights = patch_norms / np.sum(patch_norms)
                    weighted_mean = np.average(patch_weights, axis=0, weights=weights)
                else:
                    weighted_mean = np.mean(patch_weights, axis=0)

                # More aggressive blending for patch sources
                blend_factor = 0.5  # 50% smoothing for better patch coherence
                for _i, vertex in enumerate(patch):
                    W_smooth[vertex, :] = (1 - blend_factor) * W[
                        vertex, :
                    ] + blend_factor * weighted_mean

        return W_smooth

    def _apply_continuous_spatial_smoothing(self, W, patches, pos):
        """
        Apply continuous spatial smoothing that extends beyond discrete patch boundaries.
        This creates smooth spatial gradients similar to Champagne's natural smoothness.
        """
        from scipy.spatial.distance import cdist

        W_smooth = W.copy()
        n_dipoles = W.shape[0]

        # For each patch, create smooth spatial gradients extending outward
        for patch in patches:
            if len(patch) == 0:
                continue

            # Get patch center and activity
            patch_pos = pos[patch]
            patch_weights = W[patch, :]

            # Find patch centroid
            patch_activity = np.linalg.norm(patch_weights, axis=1)
            if np.sum(patch_activity) > 0:
                centroid_weights = patch_activity / np.sum(patch_activity)
                centroid_pos = np.average(patch_pos, axis=0, weights=centroid_weights)
            else:
                centroid_pos = np.mean(patch_pos, axis=0)

            # Compute distances from centroid to all source locations
            distances = cdist(centroid_pos.reshape(1, -1), pos).flatten()

            # Create smooth spatial kernel extending beyond patch
            max_patch_dist = np.max(distances[patch]) if len(patch) > 0 else 1.0
            smooth_radius = max_patch_dist * 2.0  # Extend beyond patch boundaries

            # Gaussian-like smoothing kernel
            spatial_weights = np.exp(-((distances / smooth_radius) ** 2))

            # Get representative patch activity (weighted average)
            if np.sum(patch_activity) > 0:
                representative_activity = np.average(
                    patch_weights, axis=0, weights=patch_activity
                )
            else:
                representative_activity = np.mean(patch_weights, axis=0)

            # Apply smooth spatial distribution
            for i in range(n_dipoles):
                # Blend existing activity with smooth patch influence
                if i not in patch:  # Only modify locations outside the patch
                    blend_factor = spatial_weights[i] * 0.3  # 30% influence max
                    W_smooth[i, :] = (1 - blend_factor) * W_smooth[
                        i, :
                    ] + blend_factor * representative_activity

        return W_smooth

    def apply_inverse_operator(self, mne_obj) -> mne.SourceEstimate:
        """
        Apply the inverse operator with patch-based constraints.

        Parameters
        ----------
        mne_obj : [mne.Evoked, mne.Epochs, mne.io.Raw]
            The MNE data object

        Return
        ------
        stc : mne.SourceEstimate
            The source estimate with patch structure
        """
        # Use parent class method with potential patch post-processing
        stc = super().apply_inverse_operator(mne_obj)

        # Optional: Apply continuous spatial smoothing to the solution
        if hasattr(self, "patches") and self.patches:
            stc = self._apply_continuous_solution_smoothing(stc)

        return stc

    def _apply_patch_smoothing(self, stc):
        """
        Apply post-hoc smoothing within detected patches.
        """
        data = stc.data.copy()

        for patch in self.patches:
            if len(patch) > 1:
                # Apply mild smoothing within patch
                patch_data = data[patch, :]

                # Spatial smoothing using Gaussian weights based on activation strength
                patch_power = np.linalg.norm(patch_data, axis=1)
                if patch_power.sum() > 0:
                    weights = patch_power / patch_power.sum()
                    weighted_mean = np.average(patch_data, axis=0, weights=weights)

                    # More aggressive blending for patch sources
                    blend = 0.3  # 30% smoothing for better patch coherence
                    for _i, vertex in enumerate(patch):
                        data[vertex, :] = (1 - blend) * data[
                            vertex, :
                        ] + blend * weighted_mean

        # Create new source estimate with smoothed data
        stc_smooth = mne.SourceEstimate(
            data=data,
            vertices=stc.vertices,
            tmin=stc.tmin,
            tstep=stc.tstep,
            subject=stc.subject,
        )

        return stc_smooth

    def _apply_continuous_solution_smoothing(self, stc):
        """
        Apply continuous spatial smoothing to the final solution, creating smooth
        gradients similar to Champagne's natural spatial distributions.
        """
        from scipy.spatial.distance import cdist

        data = stc.data.copy()
        pos = pos_from_forward(self.forward)

        # Create smooth spatial distributions around each patch
        for patch in self.patches:
            if len(patch) <= 1:
                continue

            # Get patch characteristics
            patch_data = data[patch, :]
            patch_pos = pos[patch]

            # Find patch centroid weighted by activity
            patch_power = np.linalg.norm(patch_data, axis=1)
            if patch_power.sum() > 0:
                weights = patch_power / patch_power.sum()
                centroid_pos = np.average(patch_pos, axis=0, weights=weights)
                representative_signal = np.average(patch_data, axis=0, weights=weights)
            else:
                centroid_pos = np.mean(patch_pos, axis=0)
                representative_signal = np.mean(patch_data, axis=0)

            # Compute distances from centroid to all sources
            distances = cdist(centroid_pos.reshape(1, -1), pos).flatten()

            # Create extended smooth kernel (larger than discrete patch)
            max_patch_dist = np.max(distances[patch])
            smooth_radius = max_patch_dist * 1.5  # Extend 50% beyond patch

            # Exponential decay kernel for smooth spatial gradients
            spatial_kernel = np.exp(-((distances / smooth_radius) ** 2))

            # Apply smooth spatial distribution to nearby sources
            for i in range(len(pos)):
                if (
                    i not in patch and spatial_kernel[i] > 0.1
                ):  # Threshold for efficiency
                    blend_factor = spatial_kernel[i] * 0.4  # Up to 40% influence
                    data[i, :] = (1 - blend_factor) * data[
                        i, :
                    ] + blend_factor * representative_signal

        # Create new source estimate with continuous spatial structure
        stc_continuous = mne.SourceEstimate(
            data=data,
            vertices=stc.vertices,
            tmin=stc.tmin,
            tstep=stc.tstep,
            subject=stc.subject,
        )

        return stc_continuous

__init__

__init__(
    name="Adaptive Patch Source Estimation",
    n_patches="auto",
    patch_size_percentile=30,
    sparsity_param=0.3,
    max_patch_size=50,
    min_patch_size=10,
    reduce_rank=True,
    rank="auto",
    **kwargs,
)
Source code in invert/solvers/hybrids/apse.py
def __init__(
    self,
    name="Adaptive Patch Source Estimation",
    n_patches="auto",
    patch_size_percentile=30,
    sparsity_param=0.3,
    max_patch_size=50,
    min_patch_size=10,
    reduce_rank=True,
    rank="auto",
    **kwargs,
):
    self.name = name
    self.n_patches = n_patches
    self.patch_size_percentile = patch_size_percentile
    self.sparsity_param = sparsity_param
    self.max_patch_size = max_patch_size
    self.min_patch_size = min_patch_size
    return super().__init__(reduce_rank=reduce_rank, rank=rank, **kwargs)

make_inverse_operator

make_inverse_operator(
    forward,
    mne_obj,
    *args,
    alpha="auto",
    max_iter=100,
    convergence_tol=0.0001,
    verbose=0,
    **kwargs,
)

Calculate the APSE inverse operator.

Parameters:

Name Type Description Default
forward Forward

The mne-python Forward model instance

required
mne_obj [Evoked, Epochs, Raw]

The MNE data object

required
alpha float or auto

The regularization parameter

'auto'
max_iter int

Maximum number of iterations for the Bayesian updates

100
convergence_tol float

Convergence tolerance for the iterative algorithm

0.0001
verbose int

Verbosity level

0
Return

self : object Returns itself for convenience

Source code in invert/solvers/hybrids/apse.py
def make_inverse_operator(
    self,
    forward,
    mne_obj,
    *args,
    alpha="auto",
    max_iter=100,
    convergence_tol=1e-4,
    verbose=0,
    **kwargs,
):
    """
    Calculate the APSE inverse operator.

    Parameters
    ----------
    forward : mne.Forward
        The mne-python Forward model instance
    mne_obj : [mne.Evoked, mne.Epochs, mne.io.Raw]
        The MNE data object
    alpha : float or 'auto'
        The regularization parameter
    max_iter : int
        Maximum number of iterations for the Bayesian updates
    convergence_tol : float
        Convergence tolerance for the iterative algorithm
    verbose : int
        Verbosity level

    Return
    ------
    self : object
        Returns itself for convenience
    """
    super().make_inverse_operator(forward, *args, alpha=alpha, **kwargs)

    # Get data and leadfield
    data = self.unpack_data_obj(mne_obj)
    leadfield = self.leadfield
    n_chans, n_dipoles = leadfield.shape

    # Regularization scale should match the sensor-space covariance matrices
    # that are diagonal-loaded inside this solver (Sigma_y + alpha * I).
    data_cov = self.data_covariance(data, center=True, ddof=1)
    self.get_alphas(reference=data_cov)

    # Normalize leadfield columns
    leadfield_norm = leadfield / np.linalg.norm(leadfield, axis=0)

    # Step 1: Estimate number of sources using subspace decomposition
    n_sources = self._estimate_n_sources(data, leadfield_norm)
    if verbose > 0:
        logger.info(f"Estimated number of active sources: {n_sources}")

    # Step 2: Get source positions
    pos = pos_from_forward(self.forward)

    # Step 3: Create patches based on spatial proximity
    patches = self._create_adaptive_patches(
        data, leadfield_norm, pos, n_sources, verbose
    )
    self.patches = patches

    # Step 4: Create inverse operators for different regularization values
    inverse_operators = []

    for alpha in self.alphas:
        # Compute patch-constrained inverse operator
        inverse_op = self._compute_patch_inverse(
            data, leadfield, patches, alpha, max_iter, convergence_tol, verbose
        )
        inverse_operators.append(inverse_op)

    self.inverse_operators = [
        InverseOperator(op, self.name) for op in inverse_operators
    ]
    return self

apply_inverse_operator

apply_inverse_operator(mne_obj) -> mne.SourceEstimate

Apply the inverse operator with patch-based constraints.

Parameters:

Name Type Description Default
mne_obj [Evoked, Epochs, Raw]

The MNE data object

required
Return

stc : mne.SourceEstimate The source estimate with patch structure

Source code in invert/solvers/hybrids/apse.py
def apply_inverse_operator(self, mne_obj) -> mne.SourceEstimate:
    """
    Apply the inverse operator with patch-based constraints.

    Parameters
    ----------
    mne_obj : [mne.Evoked, mne.Epochs, mne.io.Raw]
        The MNE data object

    Return
    ------
    stc : mne.SourceEstimate
        The source estimate with patch structure
    """
    # Use parent class method with potential patch post-processing
    stc = super().apply_inverse_operator(mne_obj)

    # Optional: Apply continuous spatial smoothing to the solution
    if hasattr(self, "patches") and self.patches:
        stc = self._apply_continuous_solution_smoothing(stc)

    return stc