ARMED-MixedEffectsDL / synthetic_dataset / spirals.py
spirals.py
Raw
import numpy as np
import seaborn as sns
sns.set_style('whitegrid')
import matplotlib.pyplot as plt
from scipy.spatial import KDTree

def make_spirals(points=1000, classes=2, degrees=360, radius=1, noise=0):
    """Generate the spirals bivariate classification problem

    Args:
        points (int, optional): number of samples. Defaults to 1000.
        classes (int, optional): number of classes. Defaults to 2.
        degrees (int, optional): length of spirals in degrees. Defaults to 360.
        radius (int, optional): radius to each spiral end. Defaults to 1.
        noise (int, optional): added Gaussian noise. Defaults to 0.

    Returns:
        array, array: x, y (one-hot labels if classes > 2)
    """    

    arrNPoints = np.zeros((classes,), dtype=int)
    for iClass in range(classes-1):
        arrNPoints[iClass] = points // classes
    arrNPoints[-1] = points - arrNPoints.sum()
    
    lsX = []
    lsY = []
    
    for iClass in range(classes):
        nPoints = arrNPoints[iClass]
        arrPoints = np.random.randint(0, degrees, size=nPoints) * np.pi / 180
        
        phase = iClass * 2 * np.pi / classes
        
        arrX1 = -np.cos(arrPoints - phase) * arrPoints * radius / arrPoints.max()
        arrX2 = np.sin(arrPoints - phase) * arrPoints * radius / arrPoints.max()
        arrX = np.stack([arrX1, arrX2]).T
        arrX += np.random.normal(scale=noise, size=(nPoints, 2))
        
        arrY = np.zeros((nPoints, classes))
        arrY[:, iClass] = 1
            
        lsX += [arrX]
        lsY += [arrY]
        
    return np.concatenate(lsX, axis=0), np.concatenate(lsY, axis=0)
        

def make_spiral_random_slope(clusters, points_per_cluster=1000, inter_cluster_sd=0.2, 
                             classes=2, degrees=360, radius=1, noise=0):
    """Generate spirals classification problem with data grouped into equal sized 
    clusters. Each cluster has a random effect slope applied to the 2nd feature.

    Args:
        clusters (int): number of clusters
        points_per_cluster (int, optional): Defaults to 1000.
        inter_cluster_sd (float, optional): S.d. of normally distributed random slopes. Defaults to 0.2.
        classes (int, optional): number of classes. Defaults to 2.
        degrees (int, optional): length of spirals in degrees. Defaults to 360.
        radius (int, optional): radius to each spiral end. Defaults to 1.
        noise (int, optional): added Gaussian noise. Defaults to 0.

    Returns:
        features, cluster membership matrix, labels, cluster random slopes
    """    
    
    arrRandomSlopes = np.random.normal(scale=inter_cluster_sd, size=(clusters,))
    
    lsX = []
    lsY = []
    lsZ = []
    for iCluster in range(clusters):
        arrXCluster, arrYCluster = make_spirals(points=points_per_cluster, classes=classes, degrees=degrees,
                                                radius=radius, noise=noise)
        arrXCluster[:, 1] *= (1 / (arrRandomSlopes[iCluster] + 1 + 1e-7))
                
        arrZCluster = np.zeros((arrXCluster.shape[0], clusters))
        arrZCluster[:, iCluster] = 1
        
        lsX += [arrXCluster]
        lsY += [arrYCluster]
        lsZ += [arrZCluster]
        
    return np.concatenate(lsX, axis=0), np.concatenate(lsZ, axis=0), np.concatenate(lsY, axis=0), arrRandomSlopes


def make_spiral_random_radius(clusters, 
                              points_per_cluster=1000, 
                              mean_radius=1.0,
                              inter_cluster_sd=0.2, 
                              classes=2, 
                              degrees=360, 
                              noise=0):
    """Generate spirals classification problem with data grouped into equal sized 
    clusters. Each cluster has a random radius drawn from a normal distribution.

    Args:
        clusters (int): number of clusters
        points_per_cluster (int, optional): Defaults to 1000.
        mean_radius (float, optional): Mean of normally distributed random radii. Defaults to 1.0.
        inter_cluster_sd (float, optional): S.d. of normally distributed random radii. Defaults to 0.2.
        classes (int, optional): number of classes. Defaults to 2.
        degrees (int, optional): length of spirals in degrees. Defaults to 360.
        noise (int, optional): added Gaussian noise. Defaults to 0.

    Returns:
        features, cluster membership matrix, labels, cluster random radii
    """    
    
    arrRadii = np.random.normal(loc=mean_radius, scale=inter_cluster_sd, size=(clusters,))
    
    lsX = []
    lsY = []
    lsZ = []
    for iCluster in range(clusters):
        arrXCluster, arrYCluster = make_spirals(points=points_per_cluster, classes=classes, degrees=degrees,
                                                radius=arrRadii[iCluster], noise=noise)
                        
        arrZCluster = np.zeros((arrXCluster.shape[0], clusters))
        arrZCluster[:, iCluster] = 1
        
        lsX += [arrXCluster]
        lsY += [arrYCluster]
        lsZ += [arrZCluster]
        
    return np.concatenate(lsX, axis=0), np.concatenate(lsZ, axis=0), np.concatenate(lsY, axis=0), arrRadii


def make_spiral_random_radius_confounder(clusters, 
                                         points_per_cluster=1000, 
                                         mean_radius=1.0,
                                         radius_sd=0.2,
                                         ratio_sd=0.2, 
                                         degrees=360, 
                                         noise=0, 
                                         confounders=1):
    """Generate spirals classification problem with data grouped into equal sized 
    clusters. Each cluster has a random radius drawn from a normal distribution. A
    confounding effect is simulated by varying the the class ratio across clusters, 
    then adding one or more confounded independent variables correlated with the
    class ratios.

    Args:
        clusters (int): number of clusters
        points_per_cluster (int, optional): Defaults to 1000.
        mean_radius (float, optional): Mean of normally distributed random radii. Defaults to 1.0.
        radius_sd (float, optional): S.d. of normally distributed random radii. Defaults to 0.2.
        ratio_sd (float, optional): S.d. of normally distributed class ratios, controls 
            strength of confounding effect. Defaults to 0.2. 
        degrees (int, optional): length of spirals in degrees. Defaults to 360.
        noise (int, optional): added Gaussian noise. Defaults to 0.
        confounders (int, optional): number of confounded variables to add. Defaults to 1.

    Returns:
        features, cluster membership matrix, labels, cluster random radii, cluster class ratios
    """ 
    
    arrRadii = np.random.normal(loc=mean_radius, scale=radius_sd, size=(clusters,))
    arrRatio = np.random.normal(loc=0.5, scale=ratio_sd, size=(clusters,))
    arrRatio[arrRatio < 0.1] = 0.1
    arrRatio[arrRatio > 0.9] = 0.9   
    
    lsX = []
    lsY = []
    lsZ = []
    for iCluster in range(clusters):
        arrXCluster, arrYCluster = make_spirals(points=2 * points_per_cluster, degrees=degrees,
                                                radius=arrRadii[iCluster], noise=noise)

        nClass0 = int(points_per_cluster * arrRatio[iCluster])
        nClass1 = points_per_cluster - nClass0

        arrClass0 = np.where(arrYCluster[:, 0])[0]
        arrClass0 = arrClass0[:nClass0]
        arrClass1 = np.where(arrYCluster[:, 1])[0]
        arrClass1 = arrClass1[:nClass1]
        arrPoints = np.concatenate([arrClass0, arrClass1], axis=0)
        
        arrXCluster = arrXCluster[arrPoints]
        arrYCluster = arrYCluster[arrPoints]
        
        arrZCluster = np.zeros((arrXCluster.shape[0], clusters))
        arrZCluster[:, iCluster] = 1
        
        arrConfounders = np.random.normal(loc=arrRatio[iCluster], scale=0.2, size=(points_per_cluster, confounders))
        arrXCluster = np.concatenate((arrXCluster, arrConfounders), axis=1)
        
        lsX += [arrXCluster]
        lsY += [arrYCluster]
        lsZ += [arrZCluster]
        
    return np.concatenate(lsX, axis=0), np.concatenate(lsZ, axis=0), np.concatenate(lsY, axis=0), arrRadii, arrRatio


def make_spiral_true_boundary(classes=2, degrees=360, radius=1):
    """Create points along the "real" decision boundary between 2 spirals

    Args:
        classes (int, optional): number of classes. Defaults to 2.
        degrees (int, optional): length of spirals in degrees. Defaults to 360.
        radius (int, optional): radius to each spiral end. Defaults to 1.
        
    """    
    
    lsX = []
    lsY = []
    
    arrPoints = np.linspace(0, degrees * 1.5, 1000) * np.pi / 180
    radmax = 1.5 * radius / arrPoints.max()
    for iClass in range(classes):
                
        phase = iClass * 2 * np.pi / classes
        
        arrX1 = -np.cos(arrPoints - phase) * arrPoints * radmax
        arrX2 = np.sin(arrPoints - phase) * arrPoints * radmax
        arrX = np.stack([arrX1, arrX2]).T
        
        arrY = np.zeros((1000, classes))
        arrY[:, iClass] = 1
            
        lsX += [arrX]
        lsY += [arrY]
        
    arrSpiralX = np.concatenate(lsX, axis=0)
    arrSpiralY = np.concatenate(lsY, axis=0)
    
    arrSpiral1X = arrSpiralX[arrSpiralY[:, 0] == 1, :]
    arrSpiral2X = arrSpiralX[arrSpiralY[:, 1] == 1, :]

    # For each point in class 1, find nearest point in class 2 and compute the midpoint
    tree = KDTree(arrSpiral2X)
    arrSpiralMid = np.zeros_like(arrSpiral1X)
    for iPoint in range(arrSpiral1X.shape[0]):
        arrPoint1 = arrSpiral1X[iPoint, :]
        _, iPoint2 = tree.query(arrPoint1, 1)
        arrPoint2 = arrSpiral2X[iPoint2, :]
        arrSpiralMid[iPoint] = 0.5 * (arrPoint1 + arrPoint2)

    # Add the inverted points
    arrSpiralMid = np.concatenate([arrSpiralMid, -arrSpiralMid], axis=0)
        
    return arrSpiralMid

        
def plot_clusters(X, Z, Y, random_effects=None, true_spiral_params=None):
    """Plot data points in each cluster.

    Args:
        X (array): independent variables
        Z (array): cluster membership design matrix
        Y (array): labels
        random_effects (array, optional): Cluster-specific random radii, 
            used to create the titles for each subplot. Defaults to None.
        true_spiral_params (dict, optional): Spiral parameters with keys 
            'classes' and 'degrees'. Defaults to None.

    Returns:
        figure, axes
    """    
    nClusters = Z.shape[1]
    nRows = int(np.ceil(nClusters / 5))

    lsMarkers = ['o', 'P', 'X', 'D']    
    fig, ax = plt.subplots(nRows, 5, figsize=(16, 3 * nRows), gridspec_kw={'hspace': 0.5})
    for iCluster in range(nClusters):       
        XCluster = X[Z[:, iCluster] == 1, :]
        YCluster = Y[Z[:, iCluster] == 1,]
         
        iRow = iCluster // 5
        iCol = iCluster % 5

        for iClass in range(Y.shape[1]):
            ax[iRow, iCol].scatter(XCluster[YCluster[:, iClass] == 1, 0], 
                                   XCluster[YCluster[:, iClass] == 1, 1], 
                                   c=f'C{iClass}', 
                                   s=1,
                                   marker=lsMarkers[iClass])
        
        if random_effects is not None:
            ax[iRow, iCol].set_title(f'Random effect: {random_effects[iCluster]:.03f}')
        if iCluster >= 5:
            ax[iRow, iCol].set_xlabel('Feature 1')
        if iCol == 0:
            ax[iRow, iCol].set_ylabel('Feature 2')
            
        if true_spiral_params:
            arrSpiralMid = make_spiral_true_boundary(true_spiral_params['classes'], 
                                                     true_spiral_params['degrees'], 
                                                     random_effects[iCluster])
            
            ax[iRow, iCol].plot(arrSpiralMid[:1000, 0], arrSpiralMid[:1000, 1], c='0.2', ls='--')
            ax[iRow, iCol].plot(arrSpiralMid[1000:, 0], arrSpiralMid[1000:, 1], c='0.2', ls='--')
         
        ax[iRow, iCol].set_xlim(X[:, 0].min(), X[:, 0].max())
        ax[iRow, iCol].set_ylim(X[:, 1].min(), X[:, 1].max())
        ax[iRow, iCol].set_aspect('equal')
        ax[iRow, iCol].set_aspect('equal')
    return fig, ax


def plot_clusters_feature_hist(X, Z, Y, feature_idx, random_effects=None):
    """For each cluster, plot a histogram of feature values..

    Args:
        X (array): independent variables
        Z (array): cluster membership design matrix
        Y (array): labels
        feature_idx (int): feature to plot
        random_effects (array, optional): Cluster-specific random radii, 
            used to create the titles for each subplot. Defaults to None.

    Returns:
        figure, axes
    """    
    
    nClusters = Z.shape[1]
    nRows = int(np.ceil(nClusters / 5))

    lsMarkers = ['o', 'P', 'X', 'D']    
    fig, ax = plt.subplots(nRows, 5, figsize=(16, 3 * nRows), gridspec_kw={'hspace': 0.5})
    for iCluster in range(nClusters):       
        XCluster = X[Z[:, iCluster] == 1, :]
        YCluster = Y[Z[:, iCluster] == 1,]
         
        iRow = iCluster // 5
        iCol = iCluster % 5

        ax[iRow, iCol].hist(XCluster[:, feature_idx], color=(0.5, 0.5, 0.5))
        
        if random_effects is not None:
            ax[iRow, iCol].set_title(f'Class balance: {random_effects[iCluster]:.03f}')
        if iCluster >= 5:
            ax[iRow, iCol].set_xlabel(f'Feature {feature_idx + 1}')
        if iCol == 0:
            ax[iRow, iCol].set_ylabel('Frequency')
         
        ax[iRow, iCol].set_xlim(X[:, feature_idx].min(), X[:, feature_idx].max())

    return fig, ax


def plot_decision_boundary(model, X, Y, Z=None, ax=None, vmax=2):
    """Plot decision boundary learned by a trained model

    Args:
        model (tf.keras.Model): trained model
        X (np.array): input array
        Y (np.array): one-hot labels
        Z (np.array, optional): cluster membership input (design matrix). Defaults to None.
        ax (plt.Axes, optional): matplotlib axes to plot to. Defaults to None.
        vmax (int, optional): how far out in input space to compute decision boundary. Defaults to 2.

    Returns:
        plt.Axes
    """      
    # Create grid of values over X-space     
    arrGridX1, arrGridX2 = np.mgrid[-vmax:vmax+0.1:0.1, -vmax:vmax+0.1:0.1]
    # Assume that the mean of the remaining features is 0.5
    arrGridXConf = np.ones((arrGridX1.size, X.shape[1])) * 0.5
    arrGridX = np.concatenate([arrGridX1.reshape(-1, 1), arrGridX2.reshape(-1, 1), arrGridXConf], axis=1)
    
    if Z is not None:
        arrGridZ = np.zeros((arrGridX.shape[0], Z.shape[1]))
        arrGridZ[:, Z.sum(axis=0).argmax()] = 1
        arrGridYFlat = model.predict((arrGridX, arrGridZ), verbose=0)
    else:
        arrGridYFlat = model.predict(arrGridX, verbose=0) 

    arrGridY = (arrGridYFlat[:, 1] >= 0.5).reshape(arrGridX1.shape).astype(int)
    
    if ax is None:
        ax = plt.gca()
        
    ax.contour(arrGridX1, arrGridX2, arrGridY, levels=1, colors='k')  
    ax.contourf(arrGridX1, arrGridX2, arrGridY, levels=1, colors=['C0', 'C1'], alpha=0.5)    
    ax.scatter(X[Y[:, 0] == 1, 0], X[Y[:, 0] == 1, 1], c='C0', s=5, alpha=0.9)
    ax.scatter(X[Y[:, 1] == 1, 0], X[Y[:, 1] == 1, 1], c='C1', s=10, alpha=0.9, marker='P')
        
    return ax