import os
import numpy as np
import anndata
class DataSplitter:
def __init__(self, seed=42):
"""
Initializes the DataSplitter with a random seed.
Parameters:
-----------
seed : int, optional
The random seed for reproducible results. Default is None.
"""
self.seed = seed
if seed is not None:
np.random.seed(seed)
"""
DataSplitter: A utility class for handling various data splitting tasks.
This class provides static and class methods for splitting datasets
into train, test, validation sets, and performing other data partitioning tasks
like random selection, and even-odd based splits. The class also includes methods
for saving the partitioned data.
Note: This class was created using chatgpt4
Methods:
--------
get_random_selection(lst: list) -> tuple:
Shuffles and splits a list into two roughly equal halves.
get_pairs_odds_selection(lst: list) -> tuple:
Splits a list into even and odd numbers.
get_adata(col_vals: list, col: str, adata: AnnData) -> AnnData:
Retrieves subset of an AnnData object based on given column values.
split_seen_unseen(col_vals: list, col: str, adata: AnnData, method: str) -> tuple:
Splits data into seen and unseen sets either randomly or based on even-odd.
get_train_val_test_indices(X: np.ndarray, train_ratio: float, val_ratio: float) -> tuple:
Splits the data into train, validation, and test indices based on provided ratios.
create_folder(folder_path: str):
Creates a new folder at the specified path.
save_adata(adata: AnnData, output_path: str):
Saves an AnnData object's content as separate files.
get_KFolds(adata: AnnData, col: str, out_folder: str):
Divides an AnnData object into K Folds and saves each fold's train, validation, and test data.
Note:
-----
The class is designed primarily for use with the AnnData object, but some methods can also
work with generic lists or numpy arrays.
"""
@staticmethod
def get_random_selection(lst):
import random
"""
Shuffles and splits a list into two roughly equal halves.
Parameters:
-----------
lst: list
The list to be split.
Returns:
--------
tuple: Two lists, each being a subset of the original list.
"""
random.shuffle(lst)
half_length = len(lst) // 2
return lst[:half_length], lst[half_length:]
@staticmethod
def get_pairs_odds_selection(lst):
"""
Splits a list into even and odd numbers.
Parameters:
-----------
lst: list
The list containing numbers to be split.
Returns:
--------
tuple: Two lists - first with even numbers and second with odd numbers from the input list.
"""
pairs = [num for num in lst if num % 2 == 0]
odds = [num for num in lst if num % 2 != 0]
return pairs, odds
@staticmethod
def get_adata(col_vals: list, col: str, adata):
"""
Retrieves subset of an AnnData object based on given column values.
Parameters:
-----------
col_vals: list
The values from the column based on which to subset the data.
col: str
The column name in the AnnData.obs to be checked.
adata: AnnData
The AnnData object to be subset.
Returns:
--------
AnnData: Subset of the original AnnData object.
"""
mask = adata.obs[col].isin(col_vals)
return anndata.AnnData(X=adata.X[mask], obs=adata.obs[mask], var=adata.var)
@classmethod
def split_seen_unseen(self, col_vals: list, col: str, adata, method='random'):
"""
Splits data into seen and unseen sets either randomly or based on even-odd.
Parameters:
-----------
col_vals: list
The values from the column based on which to subset the data.
col: str
The column name in the AnnData.obs to be checked.
adata: AnnData
The AnnData object to be subset.
method: str, optional (default='random')
The method to use for splitting. Choose between 'random' and 'pairs_odds'.
Returns:
--------
tuple: Two AnnData objects - 'seen' and 'unseen'.
"""
if method == 'random':
seen_ids, unseen_ids = self.get_random_selection(col_vals)
elif method == 'pairs_odds':
seen_ids, unseen_ids = self.get_pairs_odds_selection(col_vals)
else:
raise ValueError("Invalid method provided. Choose 'random' or 'pairs_odds'.")
seen_ = [col + "-{}".format(i) for i in seen_ids]
unseen_ = [col + "-{}".format(i) for i in unseen_ids]
seen_adata = self.get_adata(seen_, col, adata)
unseen_adata = self.get_adata(unseen_, col, adata)
return seen_adata, unseen_adata
@staticmethod
def get_train_val_test_indices(X, train_ratio=0.7, val_ratio=0.15):
"""
Splits the data into train, validation, and test indices based on provided ratios.
Parameters:
-----------
X: np.ndarray
The data matrix to be split.
train_ratio: float, optional (default=0.7)
The ratio of data to be used for training.
val_ratio: float, optional (default=0.15)
The ratio of data to be used for validation.
Returns:
--------
tuple: Indices for train, validation, and test sets.
"""
from sklearn.model_selection import train_test_split
# First, we split the data into training+validation set and test set
train_val_X, test_X, train_val_idx, test_idx = train_test_split(X,
list(range(X.shape[0])),
test_size=1-train_ratio-val_ratio,
random_state=42)
# Then, split the training+validation set into training and validation sets
train_X, val_X, train_idx, val_idx = train_test_split(train_val_X,
train_val_idx,
test_size=val_ratio/(train_ratio+val_ratio),
random_state=42)
return train_idx, val_idx, test_idx
@staticmethod
def create_folder(folder_path):
"""
Creates a new folder at the specified path.
Parameters:
-----------
folder_path: str
The path of the folder to be created.
"""
if not os.path.exists(folder_path):
print("creating folder:", folder_path)
os.makedirs(folder_path)
@staticmethod
def save_adata(adata, output_path):
"""
Saves an AnnData object's content as separate files.
Parameters:
-----------
adata: AnnData
The AnnData object to be saved.
output_path: str
The folder path where the AnnData object's content should be saved.
"""
DataSplitter.create_folder(output_path)
np.save(output_path+'/exprMatrix.npy', adata.X)
adata.var.to_csv(output_path+'/geneids.csv')
adata.obs.to_csv(output_path+'/meta.csv')
@staticmethod
def get_KFolds(adata, out_folder, n_splits=10, stratified=False, stratify_cols=None):
"""
Splits an AnnData object into training, validation, and test sets across K Folds for cross-validation.
It supports both stratified and non-stratified splits. The function ensures that each fold's test and
validation sets are unique, with the remaining data used for training.
Returns:
--------
None
The function saves the training, validation, and test datasets for each fold to the specified `out_folder`.
The train/validation/test split for each fold is determined by the KFold (or StratifiedKFold) indices.
Notes:
------
- The function uses KFold (or StratifiedKFold if `stratified` is True) for the outer loop to split data into
test and train+validation sets. Then, an inner KFold is used to split train+validation into separate training
and validation sets.
- The stratified split ensures balanced representation of classes specified in `stratify_cols` across all folds.
- For each fold, the function saves the train, validation, and test sets as separate AnnData objects in the
specified `out_folder`.
- train fraction is (n_splits-2)/n_splits
- test and val fractions are 1/n_splits each
"""
from sklearn.model_selection import KFold, StratifiedKFold
# Assuming adata.X and adata.obs are already defined
X = adata.X.copy()
adata.obs["original_index"] = adata.obs.index.values
y = adata.obs.copy()
# Shuffle the indices manually for initial randomness
indices_original = np.arange(X.shape[0])
indices = indices_original.copy()
np.random.shuffle(indices)
# Subset X and y according to the shuffled indices
X_shuffled = X[indices]
y_shuffled = y.iloc[indices].copy()
y_shuffled.reset_index(drop=True, inplace=True)
# Prepare for stratified or regular KFold
if stratified:
if not stratify_cols:
raise ValueError("stratify_cols must be specified when stratified is True.")
# Creating a stratification group column
y_shuffled['stratify_group'] = y_shuffled[stratify_cols].apply(lambda x: '_'.join(x.map(str)), axis=1)
kfold_outer = StratifiedKFold(n_splits=n_splits, shuffle=False)
else:
kfold_outer = KFold(n_splits=n_splits, shuffle=False)
# Initialize variable for special validation indices
val_split_n_index = None
# Outer loop for cross-validation
fold_no = 1
print("Note: indices and inner indices are independent")
for train_val_index, test_index in kfold_outer.split(X_shuffled, y_shuffled['stratify_group'] if stratified else None):
print(f"\nProcessing Fold {fold_no}")
print("Train/Val indices:", train_val_index[:10]) # Print first 10 train/val indices
print("Test indices:", test_index[:10])
# Subset X and y for train/validation indices
X_train_val = X_shuffled[train_val_index]
y_train_val = y_shuffled.iloc[train_val_index].copy()
y_train_val.reset_index(drop=True, inplace=True)
# Assign values to X_test, y_test
X_test = X_shuffled[test_index]
y_test = y_shuffled.iloc[test_index].copy()
y_test.reset_index(drop=True, inplace=True)
# Saving the first fold's test set (to use it in the last fold for validation. Rotation strategy)
if fold_no == 1:
val_split_n_index = test_index
# Adjusting train/val split for the last fold
if fold_no == n_splits and val_split_n_index is not None:
train_index = np.setdiff1d(train_val_index, val_split_n_index)
val_index = val_split_n_index
# Split data for train, validation, and test sets
X_train, X_val = X_shuffled[train_index], X_shuffled[val_index]
y_train, y_val = y_shuffled.iloc[train_index], y_shuffled.iloc[val_index]
y_train.reset_index(drop=True,inplace=True)
y_val.reset_index(drop=True,inplace=True)
print("Special case for the last fold:")
print("Train indices:", train_index[:10]) # Print first 10 train indices
print("Val indices:", val_index[:10]) # Print first 10 validation indices
else:
# Inner loop for nested cross-validation
inner_fold_no = 1
kfold_inner = StratifiedKFold(n_splits=n_splits-1, shuffle=False) if stratified else KFold(n_splits=n_splits-1, shuffle=False)
for inner_train_index, inner_val_index in kfold_inner.split(X_train_val, y_train_val['stratify_group'] if stratified else None):
if fold_no == inner_fold_no:
# Split data for train and validation sets
X_train, X_val = X_train_val[inner_train_index], X_train_val[inner_val_index]
y_train, y_val = y_train_val.iloc[inner_train_index], y_train_val.iloc[inner_val_index]
y_train.reset_index(drop=True,inplace=True)
y_val.reset_index(drop=True,inplace=True)
print(f"Inner Fold {inner_fold_no}")
print("Inner Train indices:", inner_train_index[:10]) # Print first 10 inner train indices
print("Inner Val indices:", inner_val_index[:10]) # Print first 10 inner val indices
break
inner_fold_no += 1
# Saving the split data
splits_folder = os.path.join(out_folder, f"split_{fold_no}")
os.makedirs(splits_folder, exist_ok=True)
adata_train = anndata.AnnData(X=X_train, obs=y_train, var=adata.var)
adata_val = anndata.AnnData(X=X_val, obs=y_val, var=adata.var)
adata_test = anndata.AnnData(X=X_test, obs=y_test, var=adata.var)
DataSplitter.save_adata(adata_train, os.path.join(splits_folder, "train"))
DataSplitter.save_adata(adata_test, os.path.join(splits_folder, "test"))
DataSplitter.save_adata(adata_val, os.path.join(splits_folder, "val"))
fold_no += 1
@staticmethod
def get_KFolds_likeSon(adata, out_folder, n_splits=10, stratified=False, stratify_cols=None):
from sklearn.model_selection import KFold, StratifiedKFold
# 1. Reshuffle data
# Create a copy from original X, y. The copies will be reshuffled
X = adata.X.copy()
adata.obs["original_index"] = adata.obs.index.values
y = adata.obs.copy()
# Shuffle the indices manually for initial randomness
indices_original = np.arange(X.shape[0])
indices = indices_original.copy()
np.random.shuffle(indices)
# Subset X and y according to the shuffled indices
X_shuffled = X[indices]
y_shuffled = y.iloc[indices].copy()
y_shuffled.reset_index(drop=True, inplace=True)
# 2. Define outer KFold
if stratified:
if not stratify_cols:
raise ValueError("stratify_cols must be specified when stratified is True.")
y_shuffled['stratify_group'] = y_shuffled[stratify_cols].apply(lambda x: '_'.join(x.map(str)), axis=1)
kfold_outer = StratifiedKFold(n_splits=n_splits, shuffle=True) # Shuffle set to True even though we are shuffling before
else:
kfold_outer = KFold(n_splits=n_splits, shuffle=True) # Shuffle set to True even though we are shuffling before
# 3. For loop to Split data in each fold
fold_no = 1
for train_val_index, test_index in kfold_outer.split(X_shuffled, y_shuffled['stratify_group'] if stratified else None):
# The rest of your method remains the same, just make sure to use X_shuffled and y_shuffled
print(f"Processing Fold {fold_no}")
# Subset X and y for train_val indices
X_train_val, y_train_val = X_shuffled[train_val_index], y_shuffled.iloc[train_val_index].copy()
y_train_val.reset_index(drop=True, inplace=True)
# 3.1. Assign values to X_test, y_test
X_test, y_test = X_shuffled[test_index],y_shuffled.iloc[test_index].copy()
y_test.reset_index(drop=True, inplace=True)
# 3.2.1 Define inner KFold for train/validation split
if stratified:
#I do not want leakage in train/val across folds. That is why, we will not shuffle again. However we did shuffle all data at the beginning of this method.
kfold_inner = StratifiedKFold(n_splits=n_splits-1, shuffle=False)
train_index, val_index = next(kfold_inner.split(X_train_val, y_train_val['stratify_group']))
else:
kfold_inner = KFold(n_splits=n_splits-1, shuffle=False)
train_index, val_index = next(kfold_inner.split(X_train_val))
# 3.2.2. Extract data for train, validation
X_train, X_val = X_train_val[train_index], X_train_val[val_index]
y_train, y_val = y_train_val.iloc[train_index], y_train_val.iloc[val_index]
# 4. Saving the split data
splits_folder = os.path.join(out_folder, f"split_{fold_no}")
os.makedirs(splits_folder, exist_ok=True)
adata_train = anndata.AnnData(X=X_train, obs=y_train, var=adata.var)
adata_val = anndata.AnnData(X=X_val, obs=y_val, var=adata.var)
adata_test = anndata.AnnData(X=X_test, obs=y_test, var=adata.var)
DataSplitter.save_adata(adata_train, os.path.join(splits_folder, "train"))
DataSplitter.save_adata(adata_test, os.path.join(splits_folder, "test"))
DataSplitter.save_adata(adata_val, os.path.join(splits_folder, "val"))
fold_no += 1
@staticmethod
def generate_fold_indices(X, n_splits=5):
from sklearn.model_selection import KFold
kf = KFold(n_splits=n_splits, shuffle=True) # Uses the global random state set by np.random.seed
fold_indices = [test_index for _, test_index in kf.split(X)]
return fold_indices
@staticmethod
def generate_stratified_fold_indices(X, y, n_splits=5):
from sklearn.model_selection import StratifiedKFold
skf = StratifiedKFold(n_splits=n_splits, shuffle=True) # Uses the global random state set by np.random.seed
fold_indices = [test_index for _, test_index in skf.split(X, y)]
return fold_indices
@staticmethod
def rotate_folds_for_cross_validation(fold_indices):
"""
Rotate through fold indices to assign training, testing, and validation sets.
"""
n_splits = len(fold_indices)
for i in range(n_splits):
test_fold = i
val_fold = (i + 1) % n_splits # Wrap-around at the end
train_folds = [fold_indices[j] for j in range(n_splits) if j != test_fold and j != val_fold]
train_idx = np.concatenate(train_folds)
test_idx = fold_indices[test_fold]
val_idx = fold_indices[val_fold]
yield train_idx, test_idx, val_idx
@staticmethod
def Get_Kfolds_likeAlbert(adata, out_folder, n_splits=10, stratified=False, stratify_cols=None):
"""
Splits an AnnData object into training, validation, and test sets across K Folds for cross-validation.
"""
# Shuffle the indices manually for initial randomness
indices_original = np.arange(adata.shape[0])
np.random.shuffle(indices_original)
adata.obs["original_index"] = adata.obs.index.values
# Subset adata according to the shuffled indices
adata_shuffled = adata[indices_original].copy()
X = adata_shuffled.X
y = None
if stratified:
if stratify_cols is not None:
y = adata_shuffled.obs[stratify_cols].astype(str).apply('_'.join, axis=1)
# Example of combining 'donor' and 'celltype' into a single label
else:
raise ValueError("stratify_cols must be specified when stratified is True.")
# Choose the fold generation method based on the stratified parameter
if stratified and y is not None:
fold_indices = DataSplitter.generate_stratified_fold_indices(X, y, n_splits=n_splits)
else:
fold_indices = DataSplitter.generate_fold_indices(X, n_splits=n_splits)
# Rotate through the folds and create the splits
for fold_no, (train_idx, test_idx, val_idx) in enumerate(DataSplitter.rotate_folds_for_cross_validation(fold_indices), start=1):
print(f"\nProcessing Fold {fold_no}")
X_train, y_train = X[train_idx], adata_shuffled.obs.iloc[train_idx]
X_test, y_test = X[test_idx], adata_shuffled.obs.iloc[test_idx]
X_val, y_val = X[val_idx], adata_shuffled.obs.iloc[val_idx]
# Save the split data
splits_folder = os.path.join(out_folder, f"split_{fold_no}")
os.makedirs(splits_folder, exist_ok=True)
adata_train = anndata.AnnData(X=X_train, obs=y_train)
adata_test = anndata.AnnData(X=X_test, obs=y_test)
adata_val = anndata.AnnData(X=X_val, obs=y_val)
DataSplitter.save_adata(adata_train, os.path.join(splits_folder, "train"))
DataSplitter.save_adata(adata_test, os.path.join(splits_folder, "test"))
DataSplitter.save_adata(adata_val, os.path.join(splits_folder, "val"))
@staticmethod
def check_stratification(adata: anndata.AnnData, adata_train: anndata.AnnData,
adata_val: anndata.AnnData, adata_test: anndata.AnnData,
stratify_cols: list):
"""
Check if the training, validation, and test data are stratified based on the given columns.
Parameters:
-----------
adata : anndata.AnnData
The original AnnData object.
adata_train : anndata.AnnData
The AnnData object for the training set.
adata_val : anndata.AnnData
The AnnData object for the validation set.
adata_test : anndata.AnnData
The AnnData object for the test set.
stratify_cols : list
The columns in the 'obs' DataFrame to check stratification for.
Returns:
--------
comparison : pd.DataFrame
A DataFrame showing the distribution of combined values in the stratify_cols for the original,
training, validation, and test datasets.
"""
import pandas as pd
# Create combined stratify column for original and split data
adata.obs['combined_stratify_col'] = adata.obs[stratify_cols].apply(lambda x: '_'.join(x.map(str)), axis=1)
adata_train.obs['combined_stratify_col'] = adata_train.obs[stratify_cols].apply(lambda x: '_'.join(x.map(str)), axis=1)
adata_val.obs['combined_stratify_col'] = adata_val.obs[stratify_cols].apply(lambda x: '_'.join(x.map(str)), axis=1)
adata_test.obs['combined_stratify_col'] = adata_test.obs[stratify_cols].apply(lambda x: '_'.join(x.map(str)), axis=1)
# Compute distribution in original data
original_dist = adata.obs['combined_stratify_col'].value_counts(normalize=True)
# Compute distribution in train, val, and test data
train_dist = adata_train.obs['combined_stratify_col'].value_counts(normalize=True)
val_dist = adata_val.obs['combined_stratify_col'].value_counts(normalize=True)
test_dist = adata_test.obs['combined_stratify_col'].value_counts(normalize=True)
# Compare distributions
comparison = pd.DataFrame({
'Original': original_dist,
'Train': train_dist,
'Validation': val_dist,
'Test': test_dist
}).fillna(0) # Fill NA values with 0, in case a category isn't present in a subset
return comparison