"""
GPU-ACCELERATED StarGAlgebra CLASS
LH & SU 2026
Python conversion of MATLAB StarGAlgebra class for group-based
convolutional tensor algebra operations.
"""
import numpy as np
from numpy.fft import fft, ifft, fft2, ifft2
from scipy.linalg import dft, svd
from itertools import permutations
from typing import Optional, Tuple, List, Dict, Union, Any
import warnings
# Try to import CuPy for GPU support
try:
import cupy as cp
CUPY_AVAILABLE = True
except ImportError:
CUPY_AVAILABLE = False
cp = None
class StarGAlgebra:
"""
StarGAlgebra: Group-based convolutional tensor algebra.
Supports various finite groups including cyclic, dihedral, symmetric,
Klein-4, quaternion, and direct products.
Parameters
----------
group_type : str
Type of group: 'cyclic', 'dihedral', 'symmetric', 'klein4',
'quaternion', 'product', or 'table'
group_param : optional
Parameter for group initialization (e.g., order for cyclic groups)
"""
def __init__(self, group_type: str, group_param: Any = None):
# Properties
self.G: np.ndarray = None # Multiplication table
self.n: int = 0 # Group order
self.F: np.ndarray = None # Fourier matrix
self.Finv: np.ndarray = None # Inverse Fourier matrix
self.irrep_dims: List[int] = [] # Irreducible representation dimensions
self.conv_tensor: np.ndarray = None # Convolution tensor
self.inv_table: np.ndarray = None # Inverse table
self.use_parallel: bool = False
self.use_gpu: bool = False
self.is_abelian: bool = False
self.is_cyclic: bool = False
self.identity_idx: int = 0
# GPU arrays (None if not using GPU)
self.G_gpu = None
self.conv_tensor_gpu = None
self.inv_table_gpu = None
self.F_gpu = None
self.Finv_gpu = None
# Initialize based on group type
group_type_lower = group_type.lower()
if group_type_lower == 'cyclic':
self._init_cyclic(group_param)
elif group_type_lower == 'dihedral':
self._init_dihedral(group_param)
elif group_type_lower == 'symmetric':
self._init_symmetric(group_param)
elif group_type_lower == 'klein4':
self._init_klein4()
elif group_type_lower == 'quaternion':
self._init_quaternion()
elif group_type_lower == 'product':
self._init_product(group_param)
elif group_type_lower == 'table':
self._init_from_table(group_param)
else:
raise ValueError(f"Unknown group type: {group_type}")
# Build auxiliary structures
self._build_convolution_tensor()
self._build_inverse_table()
self._find_identity()
self.is_abelian = self._check_abelian()
# =========================================================================
# GPU Methods
# =========================================================================
def enable_gpu(self) -> 'StarGAlgebra':
"""Enable GPU acceleration using CuPy."""
if CUPY_AVAILABLE:
try:
device = cp.cuda.Device()
self.use_gpu = True
self._init_gpu_arrays()
print(f"GPU enabled: {device}")
except Exception as e:
warnings.warn(f"Failed to enable GPU: {e}")
else:
warnings.warn("CuPy not available. Install with: pip install cupy")
return self
def _init_gpu_arrays(self):
"""Initialize GPU arrays."""
if self.use_gpu and CUPY_AVAILABLE:
self.G_gpu = cp.asarray(self.G.astype(np.int32))
self.conv_tensor_gpu = cp.asarray(self.conv_tensor)
self.inv_table_gpu = cp.asarray(self.inv_table.astype(np.int32))
self.F_gpu = cp.asarray(self.F)
self.Finv_gpu = cp.asarray(self.Finv)
def disable_gpu(self) -> 'StarGAlgebra':
"""Disable GPU acceleration."""
self.use_gpu = False
self.G_gpu = None
self.conv_tensor_gpu = None
self.inv_table_gpu = None
self.F_gpu = None
self.Finv_gpu = None
return self
def _to_gpu(self, arr: np.ndarray) -> Any:
"""Move array to GPU if GPU is enabled."""
if self.use_gpu and CUPY_AVAILABLE:
return cp.asarray(arr)
return arr
def _to_cpu(self, arr: Any) -> np.ndarray:
"""Move array to CPU."""
if self.use_gpu and CUPY_AVAILABLE and isinstance(arr, cp.ndarray):
return cp.asnumpy(arr)
return np.asarray(arr)
def _get_array_module(self, arr: Any):
"""Get the array module (numpy or cupy) for the given array."""
if self.use_gpu and CUPY_AVAILABLE:
return cp.get_array_module(arr)
return np
# =========================================================================
# Core Setup Methods
# =========================================================================
def _find_identity(self):
"""Find the identity element index."""
for e in range(self.n):
is_id = True
for a in range(self.n):
if self.G[e, a] != a or self.G[a, e] != a:
is_id = False
break
if is_id:
self.identity_idx = e
return
raise ValueError("No identity element found")
def _build_inverse_table(self):
"""Build the inverse element lookup table."""
self.inv_table = np.zeros(self.n, dtype=np.int32)
# Find identity element
e = 0
for candidate in range(self.n):
if np.all(self.G[candidate, :] == np.arange(self.n)):
e = candidate
break
# Find inverse for each element
for a in range(self.n):
for b in range(self.n):
if self.G[a, b] == e:
self.inv_table[a] = b
break
def _build_convolution_tensor(self):
"""Build the group convolution tensor."""
ng = self.n
self.conv_tensor = np.zeros((ng, ng, ng))
for a in range(ng):
for b in range(ng):
c = self.G[a, b]
self.conv_tensor[a, b, c] = 1
def _check_abelian(self) -> bool:
"""Check if the group is abelian."""
return np.array_equal(self.G, self.G.T)
# =========================================================================
# Convolution Methods
# =========================================================================
def convolve_direct(self, a: np.ndarray, b: np.ndarray) -> np.ndarray:
"""Direct group convolution using convolution tensor."""
a = a.flatten()
b = b.flatten()
if self.use_gpu and CUPY_AVAILABLE:
a = cp.asarray(a)
b = cp.asarray(b)
T = self.conv_tensor_gpu
else:
T = self.conv_tensor
xp = self._get_array_module(a)
ab = xp.outer(a, b)
c = xp.sum(xp.sum(T * ab[:, :, np.newaxis], axis=0), axis=0)
return self._to_cpu(c)
def convolve_inverse(self, a: np.ndarray, b: np.ndarray) -> np.ndarray:
"""Group convolution using inverse formulation."""
a = a.flatten()
b = b.flatten()
c = np.zeros(self.n)
for c_idx in range(self.n):
for g in range(self.n):
g_inv = self.inv_table[g]
g_inv_c = self.G[g_inv, c_idx]
c[c_idx] += a[g] * b[g_inv_c]
return c
def convolve(self, a: np.ndarray, b: np.ndarray) -> np.ndarray:
"""
Optimized group convolution.
Uses FFT for cyclic groups, direct method otherwise.
"""
a = a.flatten()
b = b.flatten()
if self.is_cyclic:
if self.use_gpu and CUPY_AVAILABLE:
a = cp.asarray(a)
b = cp.asarray(b)
c = cp.fft.ifft(cp.fft.fft(a) * cp.fft.fft(b))
c = cp.asnumpy(c)
else:
c = ifft(fft(a) * fft(b))
if np.isreal(a).all() and np.isreal(b).all():
c = np.real(c)
else:
c = self.convolve_direct(a, b)
return c
# =========================================================================
# Star-G Product Methods
# =========================================================================
def star_g_direct(self, A: np.ndarray, B: np.ndarray) -> np.ndarray:
"""Direct computation of ★_G product."""
l, m, ng = A.shape
_, p, _ = B.shape
if self.use_gpu and CUPY_AVAILABLE:
A = cp.asarray(A)
B = cp.asarray(B)
T = self.conv_tensor_gpu
C = cp.zeros((l, p, ng))
xp = cp
else:
T = self.conv_tensor
C = np.zeros((l, p, ng))
xp = np
for i in range(l):
for j in range(p):
for k in range(m):
a_ik = A[i, k, :]
b_kj = B[k, j, :]
ab = xp.outer(a_ik, b_kj)
conv_result = xp.sum(xp.sum(T * ab[:, :, np.newaxis], axis=0), axis=0)
C[i, j, :] += conv_result
return self._to_cpu(C)
def star_g_fourier(self, A: np.ndarray, B: np.ndarray) -> np.ndarray:
"""FFT-based ★_G product for cyclic groups."""
l, m, n = A.shape
_, p, _ = B.shape
if self.use_gpu and CUPY_AVAILABLE:
A = cp.asarray(A)
B = cp.asarray(B)
xp = cp
else:
xp = np
Ahat = xp.fft.fft(A, axis=2)
Bhat = xp.fft.fft(B, axis=2)
# Matrix multiply for each frequency slice
Chat = xp.zeros((l, p, n), dtype=Ahat.dtype)
for k in range(n):
Chat[:, :, k] = Ahat[:, :, k] @ Bhat[:, :, k]
C = xp.fft.ifft(Chat, axis=2)
C = self._to_cpu(C)
if np.isreal(A).all() and np.isreal(B).all():
C = np.real(C)
return C
def star_g(self, A: np.ndarray, B: np.ndarray) -> np.ndarray:
"""
★_G product - Convolutional tensor product.
Parameters
----------
A : ndarray of shape (l, m, n)
B : ndarray of shape (m, p, n)
Returns
-------
C : ndarray of shape (l, p, n)
"""
# Handle 2D inputs
if A.ndim == 2:
A = A[:, :, np.newaxis]
if A.shape[2] == 1 and self.n > 1:
A = np.tile(A, (1, 1, self.n))
if B.ndim == 2:
B = B[:, :, np.newaxis]
if B.shape[2] == 1 and self.n > 1:
B = np.tile(B, (1, 1, self.n))
_, m, n_A = A.shape
m2, _, n_B = B.shape
assert m == m2, "Inner dimensions must match"
assert n_A == self.n and n_B == self.n, "Mode-3 must equal group order"
if self.is_cyclic:
return self.star_g_fourier(A, B)
else:
return self.star_g_direct(A, B)
def star_g_batch(self, A: np.ndarray, B: np.ndarray) -> np.ndarray:
"""Batch ★_G product for 4D tensors."""
l, m, ng, batch = A.shape
_, p, _, _ = B.shape
if self.use_gpu and CUPY_AVAILABLE and self.is_cyclic:
A = cp.asarray(A)
B = cp.asarray(B)
Ahat = cp.fft.fft(A, axis=2)
Bhat = cp.fft.fft(B, axis=2)
Chat = cp.zeros((l, p, ng, batch), dtype=Ahat.dtype)
for b_idx in range(batch):
for k in range(ng):
Chat[:, :, k, b_idx] = Ahat[:, :, k, b_idx] @ Bhat[:, :, k, b_idx]
C = cp.asnumpy(np.real(cp.fft.ifft(Chat, axis=2)))
else:
C = np.zeros((l, p, ng, batch))
for b_idx in range(batch):
C[:, :, :, b_idx] = self.star_g(A[:, :, :, b_idx], B[:, :, :, b_idx])
return C
def star_g_old(self, A: np.ndarray, B: np.ndarray) -> np.ndarray:
"""
Original ★_G product implementation using group Fourier transform.
Uses the full convolution tensor structure.
"""
# Handle 2D inputs
if A.ndim == 2:
A = A[:, :, np.newaxis]
if A.shape[2] == 1 and self.n > 1:
A = np.tile(A, (1, 1, self.n))
if B.ndim == 2:
B = B[:, :, np.newaxis]
if B.shape[2] == 1 and self.n > 1:
B = np.tile(B, (1, 1, self.n))
l, m, _ = A.shape
m2, p, _ = B.shape
n = self.n
assert m == m2, "Inner dimensions must match"
# Transform along mode-3 using group Fourier matrix
A_reshape = A.transpose(2, 0, 1).reshape(n, -1) # n x (l*m)
Ahat_reshape = self.F.conj().T @ A_reshape # n x (l*m)
Ahat = Ahat_reshape.reshape(n, l, m).transpose(1, 2, 0) # l x m x n
B_reshape = B.transpose(2, 0, 1).reshape(n, -1) # n x (m*p)
Bhat_reshape = self.F.conj().T @ B_reshape # n x (m*p)
Bhat = Bhat_reshape.reshape(n, m, p).transpose(1, 2, 0) # m x p x n
# Multiply in transform domain with Peter-Weyl structure
Chat = np.zeros((l, p, n), dtype=complex)
for k in range(n):
Ck_slice = np.zeros((l, p), dtype=complex)
for x in range(n):
for y in range(n):
if self.conv_tensor[x, y, k] != 0:
Ck_slice += self.conv_tensor[x, y, k] * (Ahat[:, :, x] @ Bhat[:, :, y])
Chat[:, :, k] = Ck_slice
# Transform back
Chat_reshape = Chat.transpose(2, 0, 1).reshape(n, -1) # n x (l*p)
C_reshape = self.Finv.conj().T @ Chat_reshape # n x (l*p)
C = C_reshape.reshape(n, l, p).transpose(1, 2, 0) # l x p x n
if np.isreal(A).all() and np.isreal(B).all():
C = np.real(C)
return C
# =========================================================================
# Conjugate Transpose
# =========================================================================
def conjugate_transpose(self, A: np.ndarray) -> np.ndarray:
"""Compute ★_G conjugate transpose."""
m, p, n = A.shape
Ah = np.zeros((p, m, n), dtype=A.dtype)
for i in range(p):
for j in range(m):
for g in range(n):
g_inv = self.inv_table[g]
Ah[i, j, g] = np.conj(A[j, i, g_inv])
return Ah
def conjugate_transpose_fast(self, A: np.ndarray) -> np.ndarray:
"""Fast ★_G conjugate transpose using vectorized operations."""
Ah = np.conj(A).transpose(1, 0, 2)
Ah = Ah[:, :, self.inv_table]
return Ah
# =========================================================================
# SVD Methods
# =========================================================================
def star_g_svd(self, A: np.ndarray) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Compute ★_G-SVD decomposition.
Parameters
----------
A : ndarray of shape (l, m, n)
Returns
-------
U : ndarray of shape (l, min(l,m), n)
S : ndarray of shape (min(l,m), min(l,m), n)
V : ndarray of shape (m, min(l,m), n)
"""
l, m, n = A.shape
minlm = min(l, m)
if self.is_cyclic:
if self.use_gpu and CUPY_AVAILABLE:
A = cp.asarray(A)
Ahat = cp.fft.fft(A, axis=2)
Ahat = cp.asnumpy(Ahat)
else:
Ahat = fft(A, axis=2)
Uhat = np.zeros((l, minlm, n), dtype=complex)
Shat = np.zeros((minlm, minlm, n), dtype=complex)
Vhat = np.zeros((m, minlm, n), dtype=complex)
for i in range(n):
slice_i = Ahat[:, :, i]
Ui, Si, Vhi = svd(slice_i, full_matrices=False)
k = min(Ui.shape[1], minlm)
Uhat[:, :k, i] = Ui[:, :k]
Shat[:k, :k, i] = np.diag(Si[:k])
Vhat[:, :k, i] = Vhi[:k, :].T
U = ifft(Uhat, axis=2)
S = ifft(Shat, axis=2)
V = ifft(Vhat, axis=2)
else:
U = np.zeros((l, minlm, n), dtype=complex)
S = np.zeros((minlm, minlm, n), dtype=complex)
V = np.zeros((m, minlm, n), dtype=complex)
for g in range(n):
Ug, Sg, Vhg = svd(A[:, :, g], full_matrices=False)
k = min(Ug.shape[1], minlm)
U[:, :k, g] = Ug[:, :k]
S[:k, :k, g] = np.diag(Sg[:k])
V[:, :k, g] = Vhg[:k, :].T
if np.isreal(A).all():
U = np.real(U)
S = np.real(S)
V = np.real(V)
return U, S, V
def truncate(self, A: np.ndarray, k: int) -> np.ndarray:
"""Truncated ★_G-SVD reconstruction."""
l, m, n = A.shape
k = min(k, min(l, m))
if self.is_cyclic:
if self.use_gpu and CUPY_AVAILABLE:
A = cp.asarray(A)
Ahat = cp.fft.fft(A, axis=2)
Ahat = cp.asnumpy(Ahat)
else:
Ahat = fft(A, axis=2)
Akhat = np.zeros((l, m, n), dtype=complex)
for i in range(n):
slice_i = Ahat[:, :, i]
Ui, Si, Vhi = svd(slice_i, full_matrices=False)
ki = min(k, len(Si))
Akhat[:, :, i] = Ui[:, :ki] @ np.diag(Si[:ki]) @ Vhi[:ki, :]
Ak = ifft(Akhat, axis=2)
else:
Ak = np.zeros((l, m, n), dtype=complex)
for g in range(n):
Ug, Sg, Vhg = svd(A[:, :, g], full_matrices=False)
kg = min(k, len(Sg))
Ak[:, :, g] = Ug[:, :kg] @ np.diag(Sg[:kg]) @ Vhg[:kg, :]
if np.isreal(A).all():
Ak = np.real(Ak)
return Ak
# =========================================================================
# Group Initialization Methods
# =========================================================================
def _init_cyclic(self, n: int):
"""Initialize cyclic group Z_n."""
self.n = n
self.is_cyclic = True
I, J = np.meshgrid(np.arange(n), np.arange(n))
self.G = (I + J) % n
self.identity_idx = 0
self.F = dft(n)
self.Finv = np.conj(self.F) / n
def _init_klein4(self):
"""Initialize Klein four-group."""
self.n = 4
self.is_cyclic = False
self.G = np.array([
[0, 1, 2, 3],
[1, 0, 3, 2],
[2, 3, 0, 1],
[3, 2, 1, 0]
])
self.identity_idx = 0
self.F = np.array([
[1, 1, 1, 1],
[1, 1, -1, -1],
[1, -1, 1, -1],
[1, -1, -1, 1]
], dtype=float)
self.Finv = self.F / 4
def _init_dihedral(self, n: int):
"""Initialize dihedral group D_n."""
self.n = 2 * n
self.is_cyclic = False
self.G = np.zeros((2*n, 2*n), dtype=int)
for i in range(2*n):
for j in range(2*n):
if i < n and j < n:
self.G[i, j] = (i + j) % n
elif i < n and j >= n:
self.G[i, j] = ((j - n) + i) % n + n
elif i >= n and j < n:
self.G[i, j] = ((i - n) - j) % n + n
else:
self.G[i, j] = ((i - n) - (j - n)) % n
self.identity_idx = 0
self.F = dft(2*n)
self.Finv = np.conj(self.F) / (2*n)
def _init_symmetric(self, n: int):
"""Initialize symmetric group S_n."""
self.is_cyclic = False
perms_list = list(permutations(range(n)))
self.n = len(perms_list)
# Move identity to front
identity_perm = tuple(range(n))
identity_idx = perms_list.index(identity_perm)
if identity_idx != 0:
perms_list[0], perms_list[identity_idx] = perms_list[identity_idx], perms_list[0]
# Create permutation to index mapping
perm_to_idx = {perm: i for i, perm in enumerate(perms_list)}
# Build multiplication table
self.G = np.zeros((self.n, self.n), dtype=int)
for i in range(self.n):
for j in range(self.n):
# Compose permutations: (perms_list[i] ∘ perms_list[j])
composed = tuple(perms_list[i][k] for k in perms_list[j])
self.G[i, j] = perm_to_idx[composed]
self.identity_idx = 0
self.F = dft(self.n)
self.Finv = np.conj(self.F) / self.n
def _init_quaternion(self):
"""Initialize quaternion group Q_8."""
self.n = 8
self.is_cyclic = False
# Cayley table for Q_8 = {1, -1, i, -i, j, -j, k, -k}
# Using 0-indexed: {0, 1, 2, 3, 4, 5, 6, 7}
self.G = np.array([
[0, 1, 2, 3, 4, 5, 6, 7],
[1, 0, 3, 2, 5, 4, 7, 6],
[2, 3, 1, 0, 6, 7, 5, 4],
[3, 2, 0, 1, 7, 6, 4, 5],
[4, 5, 7, 6, 1, 0, 2, 3],
[5, 4, 6, 7, 0, 1, 3, 2],
[6, 7, 4, 5, 3, 2, 1, 0],
[7, 6, 5, 4, 2, 3, 0, 1]
])
self.identity_idx = 0
self.F = dft(8)
self.Finv = np.conj(self.F) / 8
def _init_product(self, groups: List['StarGAlgebra']):
"""Initialize direct product of groups."""
k = len(groups)
orders = [g.n for g in groups]
self.n = int(np.prod(orders))
self.is_cyclic = False
self.G = np.zeros((self.n, self.n), dtype=int)
for a in range(self.n):
a_idx = self._linear_to_multi_index(a, orders)
for b in range(self.n):
b_idx = self._linear_to_multi_index(b, orders)
c_idx = [groups[i].G[a_idx[i], b_idx[i]] for i in range(k)]
self.G[a, b] = self._multi_to_linear_index(c_idx, orders)
self.identity_idx = 0
# Kronecker product of Fourier matrices
self.F = groups[0].F
self.Finv = groups[0].Finv
for i in range(1, k):
self.F = np.kron(self.F, groups[i].F)
self.Finv = np.kron(self.Finv, groups[i].Finv)
def _linear_to_multi_index(self, lin: int, orders: List[int]) -> List[int]:
"""Convert linear index to multi-index."""
k = len(orders)
idx = [0] * k
for i in range(k-1, -1, -1):
idx[i] = lin % orders[i]
lin = lin // orders[i]
return idx
def _multi_to_linear_index(self, idx: List[int], orders: List[int]) -> int:
"""Convert multi-index to linear index."""
lin = 0
for i in range(len(orders)):
lin = lin * orders[i] + idx[i]
return lin
def _init_from_table(self, mult_table: np.ndarray):
"""Initialize from multiplication table."""
self.n = mult_table.shape[0]
self.G = mult_table.astype(int)
self.is_cyclic = False
self.F = dft(self.n)
self.Finv = np.conj(self.F) / self.n
# =========================================================================
# Utility Methods
# =========================================================================
def identity_tensor(self, m: int) -> np.ndarray:
"""Create identity tensor of size m x m x n."""
I = np.zeros((m, m, self.n))
I[:, :, self.identity_idx] = np.eye(m)
return I
def tensor_norm(self, A: np.ndarray) -> float:
"""Compute Frobenius norm of tensor."""
return np.sqrt(np.sum(np.abs(A) ** 2))
# =========================================================================
# Benchmark
# =========================================================================
def benchmark(self, sizes: List[int] = None):
"""Run performance benchmark."""
import time
if sizes is None:
sizes = [10, 20, 50, 100]
print("\n=== Performance Benchmark ===")
print(f"Group: n={self.n}, Cyclic={self.is_cyclic}, GPU={self.use_gpu}")
print(f"{'Size':<10} {'Direct (s)':<15} {'Optimized (s)':<15} {'Speedup':<10}")
print("-" * 55)
for sz in sizes:
A = np.random.randn(sz, sz, self.n)
B = np.random.randn(sz, sz, self.n)
# Warm up
_ = self.star_g(A, B)
# Direct timing
if sz <= 30:
start = time.time()
for _ in range(3):
self.star_g_direct(A, B)
t_direct = (time.time() - start) / 3
else:
t_direct = float('nan')
# Optimized timing
start = time.time()
for _ in range(10):
self.star_g_old(A, B)
t_opt = (time.time() - start) / 10
if not np.isnan(t_direct):
print(f"{sz:<10} {t_direct:<15.4f} {t_opt:<15.4f} {t_direct/t_opt:<10.1f}x")
else:
print(f"{sz:<10} {'N/A':<15} {t_opt:<15.4f} {'-':<10}")
# =========================================================================
# Verification Suite
# =========================================================================
def run_all_tests(self):
"""Run complete verification suite."""
print("=" * 40)
print("StarGAlgebra Verification Suite")
print(f"Group order: {self.n}, Cyclic: {self.is_cyclic}, Abelian: {self.is_abelian}")
print(f"Identity at index: {self.identity_idx}, GPU: {self.use_gpu}")
print("=" * 40 + "\n")
self._test_group_axioms()
self._test_convolution_tensor()
self._test_convolution_methods()
self._test_star_g_methods()
self._test_conjugate_transpose()
self._test_identity()
self._test_associativity()
self._test_svd()
print("\n" + "=" * 40)
print("All tests completed.")
print("=" * 40)
def _test_group_axioms(self):
"""Test group axioms."""
print("Test 1: Group Axioms")
passed = True
e = self.identity_idx
# Test identity
for a in range(self.n):
if self.G[e, a] != a or self.G[a, e] != a:
print(f" FAIL: Identity for {a}")
passed = False
# Test inverse
for a in range(self.n):
a_inv = self.inv_table[a]
if self.G[a, a_inv] != e or self.G[a_inv, a] != e:
print(f" FAIL: Inverse for {a}")
passed = False
# Test associativity (sample)
for _ in range(min(self.n**3, 500)):
a, b, c = np.random.randint(0, self.n, 3)
if self.G[self.G[a, b], c] != self.G[a, self.G[b, c]]:
print(" FAIL: Associativity")
passed = False
break
if passed:
print(" PASS")
def _test_convolution_tensor(self):
"""Test convolution tensor."""
print("\nTest 2: Convolution Tensor")
passed = True
for a in range(self.n):
for b in range(self.n):
c_exp = self.G[a, b]
for c in range(self.n):
if self.conv_tensor[a, b, c] != (c == c_exp):
passed = False
print(" PASS" if passed else " FAIL")
def _test_convolution_methods(self):
"""Test 1D convolution methods."""
print("\nTest 3: 1D Convolution")
a = np.random.randn(self.n)
b = np.random.randn(self.n)
c1 = self.convolve_direct(a, b)
c2 = self.convolve_inverse(a, b)
c3 = self.convolve(a, b)
err1 = np.linalg.norm(c1 - c2) / np.linalg.norm(c1)
err2 = np.linalg.norm(c1 - c3) / np.linalg.norm(c1)
print(f" Direct vs Inverse: {err1:.2e}")
print(f" Direct vs Optimized: {err2:.2e}")
tol = 1e-6
print(" PASS" if err1 < tol and err2 < tol else " FAIL")
def _test_star_g_methods(self):
"""Test StarG product methods."""
print("\nTest 4: StarG Product")
import time
A = np.random.randn(3, 4, self.n)
B = np.random.randn(4, 2, self.n)
start = time.time()
C_direct = self.star_g_direct(A, B)
t1 = time.time() - start
start = time.time()
C_main = self.star_g_old(A, B)
t2 = time.time() - start
err = np.linalg.norm(C_direct - C_main) / np.linalg.norm(C_direct)
print(f" Direct vs Main: error={err:.2e} ({t1:.4f}s vs {t2:.4f}s)")
tol = 1e-6
print(" PASS" if err < tol else " FAIL")
def _test_conjugate_transpose(self):
"""Test conjugate transpose."""
print("\nTest 5: Conjugate Transpose")
A = np.random.randn(3, 4, self.n) + 1j * np.random.randn(3, 4, self.n)
Ah1 = self.conjugate_transpose(A)
Ah2 = self.conjugate_transpose_fast(A)
err = np.linalg.norm(Ah1 - Ah2) / np.linalg.norm(Ah1)
print(f" PASS (error={err:.2e})" if err < 1e-10 else f" FAIL (error={err:.2e})")
def _test_identity(self):
"""Test identity tensor."""
print("\nTest 6: Identity")
m = 4
I = self.identity_tensor(m)
A = np.random.randn(m, m, self.n)
IA = self.star_g(I, A)
AI = self.star_g(A, I)
err1 = np.linalg.norm(IA - A) / np.linalg.norm(A)
err2 = np.linalg.norm(AI - A) / np.linalg.norm(A)
print(f" ||I*A - A||/||A|| = {err1:.2e}")
print(f" ||A*I - A||/||A|| = {err2:.2e}")
tol = 1e-10
print(" PASS" if err1 < tol and err2 < tol else " FAIL")
def _test_associativity(self):
"""Test associativity of StarG product."""
print("\nTest 7: Associativity")
A = np.random.randn(2, 3, self.n)
B = np.random.randn(3, 2, self.n)
C = np.random.randn(2, 2, self.n)
AB = self.star_g(A, B)
ABC_left = self.star_g(AB, C)
BC = self.star_g(B, C)
ABC_right = self.star_g(A, BC)
err = np.linalg.norm(ABC_left - ABC_right) / np.linalg.norm(ABC_left)
print(f" PASS (error={err:.2e})" if err < 1e-10 else f" FAIL (error={err:.2e})")
def _test_svd(self):
"""Test SVD decomposition."""
print("\nTest 8: SVD")
if not self.is_cyclic:
print(" SKIP (non-cyclic)")
return
A = np.random.randn(4, 3, self.n)
U, S, V = self.star_g_svd(A)
Vh = self.conjugate_transpose(V)
A_rec = self.star_g(self.star_g(U, S), Vh)
err = np.linalg.norm(A - A_rec) / np.linalg.norm(A)
print(f" PASS (error={err:.2e})" if err < 1e-10 else f" FAIL (error={err:.2e})")
# =============================================================================
# Additional Stable SVD Functions
# =============================================================================
def star_g_svd_stable(algebra: StarGAlgebra, A: np.ndarray) -> Tuple[np.ndarray, np.ndarray, np.ndarray, dict]:
"""
Numerically stable ★_G-SVD with exact invariants.
Parameters
----------
algebra : StarGAlgebra
The group algebra instance
A : ndarray of shape (l, m, n)
Input tensor
Returns
-------
U, S, V : ndarrays
Standard ★_G-SVD factors
sv_invariant : dict
Contains exactly invariant quantities:
- magnitudes: |singular values| in each Fourier slice
- power_spec: Power spectrum of input
- trace_invs: Trace invariants
"""
l, m, n = A.shape
minlm = min(l, m)
sv_invariant = {}
if algebra.is_cyclic:
# Fourier domain computation
Ahat = fft(A, axis=2)
n_freq = n // 2 + 1 # One-sided spectrum
Uhat = np.zeros((l, minlm, n), dtype=complex)
Shat = np.zeros((minlm, minlm, n), dtype=complex)
Vhat = np.zeros((m, minlm, n), dtype=complex)
sv_mags = np.zeros((minlm, n_freq))
for k in range(n):
slice_k = Ahat[:, :, k]
Uk, Sk, Vhk = svd(slice_k, full_matrices=False)
kk = min(len(Sk), minlm)
Uhat[:, :kk, k] = Uk[:, :kk]
Shat[:kk, :kk, k] = np.diag(Sk[:kk])
Vhat[:, :kk, k] = Vhk[:kk, :].T
if k < n_freq:
sv_mags[:kk, k] = np.abs(Sk[:kk])
U = ifft(Uhat, axis=2)
S = ifft(Shat, axis=2)
V = ifft(Vhat, axis=2)
sv_invariant['magnitudes'] = np.sort(sv_mags, axis=0)[::-1]
else:
# Direct computation for non-cyclic groups
U = np.zeros((l, minlm, n), dtype=complex)
S = np.zeros((minlm, minlm, n), dtype=complex)
V = np.zeros((m, minlm, n), dtype=complex)
sv_mags = np.zeros((minlm, n))
for g in range(n):
Ug, Sg, Vhg = svd(A[:, :, g], full_matrices=False)
kk = min(len(Sg), minlm)
U[:, :kk, g] = Ug[:, :kk]
S[:kk, :kk, g] = np.diag(Sg[:kk])
V[:, :kk, g] = Vhg[:kk, :].T
sv_mags[:kk, g] = Sg[:kk]
sv_invariant['magnitudes'] = np.sort(sv_mags, axis=0)[::-1]
# Additional invariants
sv_invariant['power_spec'] = np.abs(fft(A, axis=2)) ** 2
# Trace invariants
sv_invariant['trace_invs'] = np.zeros(4)
A_flat = A.reshape(l * m, n)
sv_invariant['trace_invs'][0] = np.sum(A_flat ** 2) # Frobenius norm²
for g in range(n):
Ag = A[:, :, g]
sv_invariant['trace_invs'][1] += np.trace(Ag @ Ag.T)
sv_invariant['trace_invs'][2] += np.trace((Ag @ Ag.T) @ (Ag @ Ag.T))
sv_invariant['trace_invs'][3] = np.sum(svd(A_flat, compute_uv=False)) # Nuclear norm
if np.isreal(A).all():
U = np.real(U)
S = np.real(S)
V = np.real(V)
return U, S, V, sv_invariant
def extract_invariant_features_stable(algebra: StarGAlgebra, A: np.ndarray) -> np.ndarray:
"""
Extract exactly invariant features from tensor.
Uses only algebraically exact invariants with numerical safeguards.
Parameters
----------
algebra : StarGAlgebra
The group algebra instance
A : ndarray of shape (l, m, n)
Input tensor
Returns
-------
feat : ndarray
Feature vector containing invariant quantities
"""
l, m, n = A.shape
# Get stable SVD with invariants
_, _, _, sv_inv = star_g_svd_stable(algebra, A)
feat = []
# 1. Singular value magnitudes (flattened, sorted)
sv_flat = sv_inv['magnitudes'].flatten()
feat.extend(sv_flat)
# 2. Power spectrum statistics
ps = sv_inv['power_spec']
ps_sum = np.sum(ps)
ps_max = np.max(ps)
ps_mean = np.mean(ps)
ps_std = np.std(ps)
feat.extend([ps_sum, ps_max, ps_mean, ps_std])
# 3. Trace invariants
feat.extend(sv_inv['trace_invs'])
# 4. Gram matrix eigenvalues for each slice (averaged)
eig_sum = np.zeros(l)
for g in range(n):
Ag = A[:, :, g]
eig_g = np.sort(np.real(np.linalg.eigvals(Ag @ Ag.T)))[::-1]
if len(eig_g) < l:
eig_g = np.concatenate([eig_g, np.zeros(l - len(eig_g))])
eig_sum += eig_g[:l]
feat.extend(eig_sum / n)
# Round for exact invariance
feat = np.round(feat, 12)
return np.array(feat)
# =============================================================================
# Example Usage
# =============================================================================
if __name__ == "__main__":
# Create cyclic group algebra
print("Testing Cyclic Group Z_5:")
alg = StarGAlgebra('cyclic', 5)
alg.run_all_tests()
print("\n" + "="*60 + "\n")
# Create Klein-4 group algebra
print("Testing Klein-4 Group:")
alg_k4 = StarGAlgebra('klein4')
alg_k4.run_all_tests()
print("\n" + "="*60 + "\n")
# Create dihedral group algebra
print("Testing Dihedral Group D_3:")
alg_d3 = StarGAlgebra('dihedral', 3)
alg_d3.run_all_tests()
print("\n" + "="*60 + "\n")
# Test stable SVD
print("Testing Stable SVD:")
A = np.random.randn(4, 3, 5)
U, S, V, inv = star_g_svd_stable(alg, A)
print(f" Invariant magnitudes shape: {inv['magnitudes'].shape}")
print(f" Trace invariants: {inv['trace_invs']}")
# Test feature extraction
feat = extract_invariant_features_stable(alg, A)
print(f" Feature vector length: {len(feat)}")
# Benchmark
print("\n" + "="*60 + "\n")
print("Running benchmark:")
alg.benchmark([5, 10, 20])