"""
NeuralStarGFramework.py - Neural Network Framework Built on ★_G Algebra
LH & SU & Claude 2026
A neural network implementation that uses the Star-G algebra for
equivariant tensor operations.
"""
import numpy as np
from typing import List, Tuple, Optional, Dict, Callable, Any
import time
from dataclasses import dataclass, field
@dataclass
class TrainingHistory:
"""Container for training metrics."""
train_loss: List[float] = field(default_factory=list)
val_loss: List[float] = field(default_factory=list)
train_r2: List[float] = field(default_factory=list)
val_r2: List[float] = field(default_factory=list)
@dataclass
class Gradients:
"""Container for gradient tensors."""
weights: List[np.ndarray] = field(default_factory=list)
biases: List[np.ndarray] = field(default_factory=list)
class NeuralStarGFramework:
"""
Neural Network Framework Built on ★_G Algebra.
Implements a multi-layer neural network where weight matrices are
3D tensors and matrix multiplication is replaced by the ★_G product,
providing group equivariance.
Parameters
----------
G : StarGAlgebra
The group algebra instance defining the ★_G product
layer_sizes : list of int
Sizes of each layer including input and output
learning_rate : float, optional
Initial learning rate for Adam optimizer (default: 0.001)
use_gpu : bool, optional
Whether to use GPU acceleration (default: False)
Attributes
----------
weights : list of ndarray
Weight tensors for each layer, shape (out_dim, in_dim, n_group)
biases : list of ndarray
Bias tensors for each layer, shape (out_dim, 1, n_group)
history : TrainingHistory
Training metrics history
Example
-------
>>> from star_g_algebra import StarGAlgebra
>>> G = StarGAlgebra('cyclic', 8)
>>> net = NeuralStarGFramework(G, [16, 32, 16, 1], learning_rate=0.001)
>>> net.train(X_train, Y_train, X_val, Y_val, epochs=100)
>>> predictions = net.predict(X_test)
"""
def __init__(
self,
G: Any, # StarGAlgebra
layer_sizes: List[int],
learning_rate: float = 0.001,
use_gpu: bool = False
):
self.G = G
self.layers = layer_sizes
self.learning_rate = learning_rate
self.use_gpu = use_gpu
self.weight_decay = 1e-4
self.t = 0 # Adam timestep
n_layers = len(layer_sizes) - 1
# Initialize weight and bias tensors
self.weights: List[np.ndarray] = []
self.biases: List[np.ndarray] = []
# Adam optimizer moment estimates
self.m_weights: List[np.ndarray] = []
self.v_weights: List[np.ndarray] = []
self.m_biases: List[np.ndarray] = []
self.v_biases: List[np.ndarray] = []
# Activation functions: ReLU for hidden layers, linear for output
self.activations: List[Callable] = []
for l in range(n_layers - 1):
self.activations.append(lambda x: np.maximum(0, x))
self.activations.append(lambda x: x) # Linear for output
# Xavier/Glorot initialization
for l in range(n_layers):
fan_in = layer_sizes[l]
fan_out = layer_sizes[l + 1]
scale = np.sqrt(2.0 / (fan_in + fan_out))
# Weight tensor: (out_dim, in_dim, n_group)
W = scale * np.random.randn(fan_out, fan_in, G.n)
self.weights.append(W)
# Bias tensor: (out_dim, 1, n_group)
b = np.zeros((fan_out, 1, G.n))
self.biases.append(b)
# Initialize Adam moments to zeros
self.m_weights.append(np.zeros_like(W))
self.v_weights.append(np.zeros_like(W))
self.m_biases.append(np.zeros_like(b))
self.v_biases.append(np.zeros_like(b))
# Training history
self.history = TrainingHistory()
def forward(self, X: np.ndarray) -> Tuple[np.ndarray, List[np.ndarray]]:
"""
Forward pass through the network.
Parameters
----------
X : ndarray
Input tensor of shape (batch_size, n_features, n_group) or
(n_features, n_group) for single sample
Returns
-------
output : ndarray
Network output of shape (batch_size, out_dim, n_group)
cache : list of ndarray
Cached activations for each layer (used in backprop)
"""
n_layers = len(self.weights)
# Handle 2D input (single sample)
if X.ndim == 2:
batch_size = 1
n_feat = X.shape[0]
X = X.reshape(1, n_feat, self.G.n)
else:
batch_size = X.shape[0]
# Cache stores activations for backpropagation
cache: List[np.ndarray] = [X]
A = X
for l in range(n_layers):
W = self.weights[l]
b = self.biases[l]
out_dim, in_dim, n_g = W.shape
Z = np.zeros((batch_size, out_dim, n_g))
for i in range(batch_size):
# Extract sample i
A_i = A[i, :, :] # (in_dim, n_g) or needs transpose
# Ensure correct shape
if A_i.shape[0] != in_dim:
A_i = A_i.T
# Reshape for starG: (in_dim, 1, n_g)
A_i_3d = A_i.reshape(in_dim, 1, n_g)
# Star-G product: W (out_dim, in_dim, n_g) ★ A_i (in_dim, 1, n_g)
Z_i = self.G.star_g(W, A_i_3d) # (out_dim, 1, n_g)
# Add bias
Z_i = Z_i + b # (out_dim, 1, n_g)
# Store result
Z[i, :, :] = Z_i.squeeze(axis=1) # (out_dim, n_g)
# Apply activation function
A = self.activations[l](Z)
cache.append(A)
return A, cache
def invariant_pool(self, X: np.ndarray) -> np.ndarray:
"""
Pool across group dimension to get invariant output.
Computes mean over spatial and group dimensions.
Parameters
----------
X : ndarray of shape (batch_size, out_dim, n_group)
Returns
-------
y : ndarray of shape (batch_size,)
"""
# Mean over group dimension (axis 2), then over output dimension (axis 1)
y = np.mean(np.mean(X, axis=2), axis=1)
return y
def predict(self, X: np.ndarray) -> Tuple[np.ndarray, List[np.ndarray]]:
"""
Make predictions on input data.
Parameters
----------
X : ndarray
Input tensor
Returns
-------
y_pred : ndarray
Predictions of shape (batch_size,) or scalar
cache : list of ndarray
Cached activations
"""
output, cache = self.forward(X)
y_pred = self.invariant_pool(output)
y_pred = y_pred.squeeze()
return y_pred, cache
def compute_loss(self, y_pred: np.ndarray, y_true: np.ndarray) -> float:
"""
Compute mean squared error loss.
Parameters
----------
y_pred : ndarray
Predicted values
y_true : ndarray
True values
Returns
-------
loss : float
MSE loss value
"""
return float(np.mean((y_pred.flatten() - y_true.flatten()) ** 2))
def backward(self, X: np.ndarray, y_true: np.ndarray) -> Gradients:
"""
Compute gradients using numerical differentiation.
Uses central finite differences with random sampling for efficiency.
Parameters
----------
X : ndarray
Input batch
y_true : ndarray
True target values
Returns
-------
grads : Gradients
Container with weight and bias gradients
"""
epsilon = 1e-5
n_layers = len(self.weights)
grads = Gradients()
grads.weights = [np.zeros_like(W) for W in self.weights]
grads.biases = [np.zeros_like(b) for b in self.biases]
for l in range(n_layers):
W_orig = self.weights[l].copy()
d1, d2, d3 = W_orig.shape
total_params = d1 * d2 * d3
# Sample subset of parameters for efficiency
n_sample = min(30, total_params)
sample_idx = np.random.choice(total_params, n_sample, replace=False)
for idx in sample_idx:
# Convert flat index to 3D indices
ii, jj, kk = np.unravel_index(idx, (d1, d2, d3))
# Forward difference
self.weights[l][ii, jj, kk] = W_orig[ii, jj, kk] + epsilon
y_plus, _ = self.predict(X)
loss_plus = self.compute_loss(y_plus, y_true)
# Backward difference
self.weights[l][ii, jj, kk] = W_orig[ii, jj, kk] - epsilon
y_minus, _ = self.predict(X)
loss_minus = self.compute_loss(y_minus, y_true)
# Central difference gradient
grads.weights[l][ii, jj, kk] = (loss_plus - loss_minus) / (2 * epsilon)
# Restore original weight
self.weights[l][ii, jj, kk] = W_orig[ii, jj, kk]
# Scale gradient estimate to account for sampling
grads.weights[l] = grads.weights[l] * (total_params / n_sample)
return grads
def adam_update(self, grads: Gradients):
"""
Update weights using Adam optimizer.
Parameters
----------
grads : Gradients
Computed gradients
"""
beta1 = 0.9
beta2 = 0.999
eps = 1e-8
self.t += 1
for l in range(len(self.weights)):
# Update biased first moment estimate
self.m_weights[l] = beta1 * self.m_weights[l] + (1 - beta1) * grads.weights[l]
# Update biased second raw moment estimate
self.v_weights[l] = beta2 * self.v_weights[l] + (1 - beta2) * (grads.weights[l] ** 2)
# Compute bias-corrected estimates
m_hat = self.m_weights[l] / (1 - beta1 ** self.t)
v_hat = self.v_weights[l] / (1 - beta2 ** self.t)
# Update weights
self.weights[l] = self.weights[l] - self.learning_rate * m_hat / (np.sqrt(v_hat) + eps)
# Apply weight decay
self.weights[l] = self.weights[l] * (1 - self.weight_decay)
def train(
self,
X_train: np.ndarray,
Y_train: np.ndarray,
X_val: np.ndarray,
Y_val: np.ndarray,
epochs: int = 100,
batch_size: int = 32,
verbose: bool = True,
patience: int = 20
) -> 'NeuralStarGFramework':
"""
Train the neural network.
Parameters
----------
X_train : ndarray of shape (n_train, n_features, n_group)
Training input data
Y_train : ndarray of shape (n_train,)
Training targets
X_val : ndarray of shape (n_val, n_features, n_group)
Validation input data
Y_val : ndarray of shape (n_val,)
Validation targets
epochs : int, optional
Maximum number of training epochs (default: 100)
batch_size : int, optional
Mini-batch size (default: 32)
verbose : bool, optional
Whether to print training progress (default: True)
patience : int, optional
Early stopping patience (default: 20)
Returns
-------
self : NeuralStarGFramework
The trained network
"""
n_train = X_train.shape[0]
n_batches = int(np.ceil(n_train / batch_size))
# Reset history
self.history = TrainingHistory()
best_val_loss = np.inf
patience_counter = 0
best_weights = [W.copy() for W in self.weights]
best_biases = [b.copy() for b in self.biases]
if verbose:
print(f"\nTraining Neural Star_G Network")
print(f"Epochs: {epochs}, Batch: {batch_size}, LR: {self.learning_rate:.4f}\n")
for epoch in range(1, epochs + 1):
start_time = time.time()
# Shuffle training data
perm = np.random.permutation(n_train)
epoch_loss = 0.0
for batch in range(n_batches):
batch_start = batch * batch_size
batch_end = min((batch + 1) * batch_size, n_train)
batch_idx = perm[batch_start:batch_end]
X_batch = X_train[batch_idx]
Y_batch = Y_train[batch_idx]
# Forward pass
y_pred, _ = self.predict(X_batch)
batch_loss = self.compute_loss(y_pred, Y_batch)
epoch_loss += batch_loss
# Backward pass and update
grads = self.backward(X_batch, Y_batch)
self.adam_update(grads)
epoch_time = time.time() - start_time
# Record training loss
train_loss = epoch_loss / n_batches
self.history.train_loss.append(train_loss)
# Validation metrics
y_val_pred, _ = self.predict(X_val)
val_loss = self.compute_loss(y_val_pred, Y_val)
self.history.val_loss.append(val_loss)
# Compute R² scores
y_train_pred, _ = self.predict(X_train)
train_r2 = self._compute_r2(y_train_pred, Y_train)
val_r2 = self._compute_r2(y_val_pred, Y_val)
self.history.train_r2.append(train_r2)
self.history.val_r2.append(val_r2)
# Early stopping check
if val_loss < best_val_loss:
best_val_loss = val_loss
best_weights = [W.copy() for W in self.weights]
best_biases = [b.copy() for b in self.biases]
patience_counter = 0
else:
patience_counter += 1
# Print progress
if verbose and epoch % 5 == 0:
print(f"Epoch {epoch:3d}: Loss={train_loss:.4f}, "
f"R2={train_r2:.4f}, Val R2={val_r2:.4f} ({epoch_time:.1f}s)")
# Early stopping
if patience_counter >= patience:
if verbose:
print(f"Early stopping at epoch {epoch}")
break
# Restore best weights
self.weights = best_weights
self.biases = best_biases
if verbose:
best_val_r2 = max(self.history.val_r2)
print(f"\nBest Val R2: {best_val_r2:.4f}\n")
return self
def _compute_r2(self, y_pred: np.ndarray, y_true: np.ndarray) -> float:
"""Compute R² (coefficient of determination)."""
y_pred = y_pred.flatten()
y_true = y_true.flatten()
ss_res = np.sum((y_pred - y_true) ** 2)
ss_tot = np.sum((y_true - np.mean(y_true)) ** 2) + 1e-10
return float(1 - ss_res / ss_tot)
def compress_weights(self, rank: int):
"""
Compress network weights using truncated ★_G-SVD.
Parameters
----------
rank : int
Target rank for truncation
"""
print(f"Compressing to rank {rank}...")
for l in range(len(self.weights)):
W_orig = self.weights[l]
W_comp = self.G.truncate(W_orig, rank)
orig_norm = np.linalg.norm(W_orig) + 1e-10
err = np.linalg.norm(W_orig - W_comp) / orig_norm
self.weights[l] = W_comp
print(f"Layer {l + 1}: {err * 100:.2f}% error")
def count_parameters(self) -> int:
"""Count total number of trainable parameters."""
total = 0
for W, b in zip(self.weights, self.biases):
total += W.size + b.size
return total
def summary(self):
"""Print network architecture summary."""
print("\n" + "=" * 50)
print("Neural Star-G Network Summary")
print("=" * 50)
print(f"Group order: {self.G.n}")
print(f"Layer sizes: {self.layers}")
print(f"Learning rate: {self.learning_rate}")
print(f"Weight decay: {self.weight_decay}")
print("-" * 50)
total_params = 0
for l, (W, b) in enumerate(zip(self.weights, self.biases)):
w_params = W.size
b_params = b.size
layer_params = w_params + b_params
total_params += layer_params
print(f"Layer {l + 1}:")
print(f" Weight shape: {W.shape}")
print(f" Bias shape: {b.shape}")
print(f" Parameters: {layer_params:,}")
print("-" * 50)
print(f"Total parameters: {total_params:,}")
print("=" * 50 + "\n")
def save(self, filepath: str):
"""
Save network weights to file.
Parameters
----------
filepath : str
Path to save file (.npz format)
"""
save_dict = {
'layers': np.array(self.layers),
'learning_rate': self.learning_rate,
'weight_decay': self.weight_decay,
't': self.t,
}
# Save weights and biases
for l in range(len(self.weights)):
save_dict[f'weight_{l}'] = self.weights[l]
save_dict[f'bias_{l}'] = self.biases[l]
save_dict[f'm_weight_{l}'] = self.m_weights[l]
save_dict[f'v_weight_{l}'] = self.v_weights[l]
save_dict[f'm_bias_{l}'] = self.m_biases[l]
save_dict[f'v_bias_{l}'] = self.v_biases[l]
np.savez(filepath, **save_dict)
print(f"Model saved to {filepath}")
def load(self, filepath: str):
"""
Load network weights from file.
Parameters
----------
filepath : str
Path to saved file (.npz format)
"""
data = np.load(filepath)
self.layers = data['layers'].tolist()
self.learning_rate = float(data['learning_rate'])
self.weight_decay = float(data['weight_decay'])
self.t = int(data['t'])
n_layers = len(self.layers) - 1
self.weights = []
self.biases = []
self.m_weights = []
self.v_weights = []
self.m_biases = []
self.v_biases = []
for l in range(n_layers):
self.weights.append(data[f'weight_{l}'])
self.biases.append(data[f'bias_{l}'])
self.m_weights.append(data[f'm_weight_{l}'])
self.v_weights.append(data[f'v_weight_{l}'])
self.m_biases.append(data[f'm_bias_{l}'])
self.v_biases.append(data[f'v_bias_{l}'])
print(f"Model loaded from {filepath}")
# =============================================================================
# Convenience Functions
# =============================================================================
def create_star_g_network(
G: Any,
input_dim: int,
hidden_dims: List[int],
output_dim: int,
**kwargs
) -> NeuralStarGFramework:
"""
Convenience function to create a Star-G neural network.
Parameters
----------
G : StarGAlgebra
Group algebra instance
input_dim : int
Input feature dimension
hidden_dims : list of int
Hidden layer dimensions
output_dim : int
Output dimension
**kwargs
Additional arguments passed to NeuralStarGFramework
Returns
-------
net : NeuralStarGFramework
Initialized network
"""
layer_sizes = [input_dim] + hidden_dims + [output_dim]
return NeuralStarGFramework(G, layer_sizes, **kwargs)
# =============================================================================
# Example Usage and Tests
# =============================================================================
if __name__ == "__main__":
# Import the StarGAlgebra (assuming it's in the same directory)
try:
from star_g_algebra import StarGAlgebra
except ImportError:
print("StarGAlgebra not found. Creating minimal mock for testing.")
class StarGAlgebra:
"""Minimal mock for testing."""
def __init__(self, group_type, n):
self.n = n
self.is_cyclic = True
def star_g(self, A, B):
"""Simple FFT-based star product for cyclic groups."""
from numpy.fft import fft, ifft
Ahat = fft(A, axis=2)
Bhat = fft(B, axis=2)
Chat = np.zeros((A.shape[0], B.shape[1], A.shape[2]), dtype=complex)
for k in range(A.shape[2]):
Chat[:, :, k] = Ahat[:, :, k] @ Bhat[:, :, k]
C = ifft(Chat, axis=2)
return np.real(C)
def truncate(self, A, k):
"""Simple truncation."""
return A
print("=" * 60)
print("NeuralStarGFramework Test Suite")
print("=" * 60)
# Create group algebra
n_group = 8
G = StarGAlgebra('cyclic', n_group)
print(f"\nGroup order: {G.n}")
# Create network
input_dim = 16
hidden_dims = [32, 16]
output_dim = 1
net = NeuralStarGFramework(
G,
[input_dim] + hidden_dims + [output_dim],
learning_rate=0.001
)
# Print summary
net.summary()
# Generate synthetic data
print("Generating synthetic data...")
n_train = 100
n_val = 20
X_train = np.random.randn(n_train, input_dim, n_group)
Y_train = np.sum(np.mean(X_train, axis=2), axis=1) + np.random.randn(n_train) * 0.1
X_val = np.random.randn(n_val, input_dim, n_group)
Y_val = np.sum(np.mean(X_val, axis=2), axis=1) + np.random.randn(n_val) * 0.1
print(f"X_train shape: {X_train.shape}")
print(f"Y_train shape: {Y_train.shape}")
# Test forward pass
print("\nTesting forward pass...")
output, cache = net.forward(X_train[:5])
print(f"Output shape: {output.shape}")
# Test prediction
print("\nTesting prediction...")
y_pred, _ = net.predict(X_train[:5])
print(f"Predictions shape: {y_pred.shape}")
print(f"Sample predictions: {y_pred[:3]}")
# Test training (short run)
print("\nTesting training (10 epochs)...")
net.train(
X_train, Y_train,
X_val, Y_val,
epochs=10,
batch_size=16,
verbose=True,
patience=5
)
# Test compression
print("\nTesting weight compression...")
net.compress_weights(rank=2)
# Test save/load
print("\nTesting save/load...")
net.save("test_model.npz")
net2 = NeuralStarGFramework(G, [input_dim] + hidden_dims + [output_dim])
net2.load("test_model.npz")
# Verify loaded model gives same predictions
y_pred1, _ = net.predict(X_val[:3])
y_pred2, _ = net2.predict(X_val[:3])
print(f"Original predictions: {y_pred1}")
print(f"Loaded predictions: {y_pred2}")
print(f"Match: {np.allclose(y_pred1, y_pred2)}")
# Cleanup
import os
if os.path.exists("test_model.npz"):
os.remove("test_model.npz")
print("\n" + "=" * 60)
print("All tests completed successfully!")
print("=" * 60)