import os import re from scMEDAL.utils.utils import get_split_paths,read_adata,min_max_scaling,plot_rep,calculate_zscores import pandas as pd from scMEDAL.utils.utils_load_model import get_latest_checkpoint from anndata import AnnData import scanpy as sc import gc import os import re import pandas as pd import numpy as np from sklearn.manifold import TSNE from sklearn.decomposition import PCA # Glob does not work well with strange characters def glob_like(directory, pattern): """Mimic glob.glob() using os and re modules.""" regex_pattern = re.compile(pattern) files = os.listdir(directory) matched_files = [] for file in files: if regex_pattern.search(file): matched_files.append(file) return [os.path.join(directory, file) for file in matched_files] def get_recon_paths_df(results_path_dict,get_batch_recon_paths = False,k_folds = 5): # Initialize an empty list to store data data = [] # Patterns to search for with corresponding type patterns = [('recon_train*', 'train'), ('recon_val*', 'val'),('recon_test*', 'test')] if get_batch_recon_paths: patterns += [('recon_batch_train*', 'train'), ('recon_batch_val*', 'val'),('recon_batch_test*', 'test')] # Iterate over all the keys and splits for key, path in results_path_dict.items(): for split in range(1, k_folds+1): # Directory to search in directory = f"{path}/splits_{split}" if os.path.exists(directory): for pattern, data_type in patterns: # Use glob_like to find files matching the pattern files = glob_like(directory, pattern) # Append the results to the data list for file in files: data.append({'Key': key, 'Split': split, 'ReconPath': file, 'Type': data_type}) # Create a DataFrame from the data list df = pd.DataFrame(data) return df def get_latent_paths_df(results_path_dict, k_folds=5): data = [] # Simplified patterns to search for patterns = [('latent.*train', 'train'), ('latent.*val', 'val'),('latent.*test', 'test')] for key, path in results_path_dict.items(): for split in range(1, k_folds+1): directory = f"{path}/splits_{split}" #print(f"Checking directory: {directory}") if os.path.exists(directory): for pattern, data_type in patterns: files = glob_like(directory, pattern) #print(f"Pattern: {pattern}, Files: {files}") for file in files: data.append({'Key': key, 'Split': split, 'LatentPath': file, 'Type': data_type}) else: print(f"Directory does not exist: {directory}") df = pd.DataFrame(data) return df def create_latent_dict_from_df(df_latent,model_col="Key",fold_col="Split", dataset_col = "Type",path_col="LatentPath"): """ Create a nested dictionary from a DataFrame to structure latent paths. Parameters: ----------- df_latent : pd.DataFrame A DataFrame containing columns that specify the model, split (or fold), dataset type, and the corresponding latent path. model_col : str, optional (default="Key") The column name in df_latent representing the model identifiers. fold_col : str, optional (default="Split") The column name in df_latent representing the split or fold identifiers. dataset_col : str, optional (default="Type") The column name in df_latent representing the dataset type (e.g., 'train', 'test'). path_col : str, optional (default="LatentPath") The column name in df_latent representing the paths or values to be structured in the dictionary. Returns: -------- dict A dictionary where the keys are model names, and the values are nested dictionaries structured as: latent_dict[model]["splits_" + str(fold)][dataset] = path or value. Example: -------- >>> df_latent = pd.DataFrame({ 'Key': ['model1', 'model1', 'model2'], 'Split': [1, 2, 1], 'Type': ['train', 'test', 'train'], 'LatentPath': ['/path/to/model1_fold1_train', '/path/to/model1_fold2_test', '/path/to/model2_fold1_train'] }) >>> latent_dict = create_latent_dict_from_df(df_latent) >>> print(latent_dict) { 'model1': { 'splits_1': {'train': '/path/to/model1_fold1_train'}, 'splits_2': {'test': '/path/to/model1_fold2_test'} }, 'model2': { 'splits_1': {'train': '/path/to/model2_fold1_train'} } } """ latent_dict = {} for _, row in df_latent.iterrows(): model = row[model_col] fold = row[fold_col] dataset = row[dataset_col] value = row[path_col] # Assuming there is a column 'value' in df_latent # Initialize nested dictionaries if not already present if model not in latent_dict: latent_dict[model] = {} if "splits_" + str(fold) not in latent_dict[model]: latent_dict[model]["splits_" + str(fold)] = {} # Assign the value to the correct place in the dictionary latent_dict[model]["splits_" + str(fold)][dataset] = value return latent_dict def get_input_paths_df(input_base_path,k_folds = 5,eval_test=False): """ Generate a DataFrame containing the input paths for each fold in a k-fold cross-validation setup. Parameters: ----------- input_base_path : str The base directory path where the input data is stored. k_folds : int, optional The number of folds in the k-fold cross-validation (default is 5). eval_test : bool, optional If True, includes all data types (including 'test') in the DataFrame. If False, excludes 'test' data type from the DataFrame (default is False). When training the model you don't use test. This is why the default is False. Returns: -------- pd.DataFrame A DataFrame with columns 'Split', 'Type', and 'InputsPath', where: - 'Split' indicates the fold number. - 'Type' indicates the type of data (e.g., 'train', 'validation', 'test'). - 'InputsPath' contains the directory path for the corresponding fold and data type. Example: -------- >>> df = get_input_paths_df("/path/to/data", k_folds=5, eval_test=True) >>> print(df.head()) Split Type InputsPath 0 1 train /path/to/data/fold_1/train 1 1 validation /path/to/data/fold_1/validation 2 1 test /path/to/data/fold_1/test 3 2 train /path/to/data/fold_2/train 4 2 validation /path/to/data/fold_2/validation Notes: ------ - The function assumes that the directory structure for each fold and data type is already created and accessible. - If `eval_test` is set to False, the 'test' data type will be excluded from the resulting DataFrame. """ # Initialize an empty list to store data data = [] # Iterate over all folds for fold in range(1, k_folds+1): # Get input paths for the current fold input_path_dict = get_split_paths(base_path=input_base_path, fold=fold) # print(f"Folder split: {input_path_dict}") for data_type, directory in input_path_dict.items(): # if data_type != 'test' and os.path.exists(directory): # If eval_test is True, append all data types # If eval_test is False, append only non-'test' data types if eval_test or (data_type != 'test' and os.path.exists(directory)): data.append({'Split': fold, 'Type': data_type, 'InputsPath': directory}) # Create a DataFrame from the data list df = pd.DataFrame(data) return df def get_model_paths_df(results_path_dict,k_folds = 5): """ Creates a DataFrame containing the paths to the latest checkpoint and model parameters for each key and split. Args: results_path_dict (dict): A dictionary where keys are identifiers and values are base directories to search for checkpoints. kfolds(int): number of k folds. This is equal to number of saved models. Returns: pandas.DataFrame: A DataFrame with columns 'Key', 'Split', 'last_checkpoint', and 'model_params' containing the relevant paths. Notes: The function searches for checkpoint files and model parameter files in subdirectories named 'splits_' under each path provided in results_path_dict. It considers up to 5 splits. """ # Initialize an empty list to store data data = [] # Iterate over all the keys and splits for key, path in results_path_dict.items(): for split in range(1, k_folds+1): # Directory to search in directory = f"{path}/splits_{split}" if os.path.exists(directory): file = get_latest_checkpoint(directory) model_params_file = os.path.join(directory, 'model_params.yaml') config_file = os.path.join(directory, 'model_config.py') data.append({'Key': key, 'Split': split, 'last_checkpoint': file,'model_params':model_params_file,'config':config_file}) # Create a DataFrame from the data list df = pd.DataFrame(data) return df def filter_models_by_type_and_split(df, models_dict, Type='train'): """ Filters the DataFrame for the specified models, type 'train', and given splits. Parameters: df (pd.DataFrame): The input DataFrame containing the following columns: - 'Key': The name of the model. - 'Type': The type of data (e.g., 'train', 'val', 'test'). - 'Split': The split identifier. models_dict (dict): Dictionary specifying models and the corresponding splits to filter. The keys are the model names (values in the 'Key' column), and the values are the split numbers (values in the 'Split' column). Type (str): The type of data to filter on (default is 'train'). Returns: pd.DataFrame: The filtered DataFrame containing only the rows that match the specified model names, have 'Type' equal to 'train', and match the corresponding split number. """ filtered_df = pd.DataFrame() for model, split in models_dict.items(): temp_df = df[(df['Key'] == model) & (df['Type'] == Type) & (df['Split'] == split)] filtered_df = pd.concat([filtered_df, temp_df], ignore_index=True) return filtered_df def get_hvg(adata, num_genes_to_retain=None): """ Computes the variance of each gene, stores the variances in the AnnData object as a DataFrame, and flags the top high variable genes with their rank, while preserving the original gene order. Parameters: adata (anndata.AnnData): The input AnnData object where rows are cells and columns are genes. num_genes_to_retain (int, optional): The number of top high variable genes to retain and flag. If None, no flagging is done. Returns: pd.DataFrame: DataFrame with gene names, variances, rank, and a flag indicating if they are in the top high variable genes. """ # Compute the variance of each gene (column) across all cells (rows) gene_variances = np.var(adata.X, axis=0) # Create a DataFrame with gene names and their variances variances_df = pd.DataFrame({ 'gene': adata.var_names, 'variance': gene_variances }) # Sort the DataFrame by variance in descending order to determine ranks variances_df_sorted = variances_df.sort_values(by='variance', ascending=False).reset_index(drop=True) # Add rank column based on sorted variances variances_df_sorted['rank'] = np.arange(1, len(variances_df_sorted) + 1) # Merge back to original DataFrame to preserve original gene order variances_df = variances_df.merge(variances_df_sorted[['gene', 'rank']], on='gene', how='left') if num_genes_to_retain is not None: # Flag the top high variable genes variances_df['high_variable'] = variances_df['rank'] <= num_genes_to_retain else: # Add the column without flagging any genes variances_df['high_variable'] = False # Store the DataFrame in the AnnData object adata.uns['gene_variances'] = variances_df return adata # Define some functions to preprocess df and get the same format for all of them # functions to homogenize the results def aggregate_paths(results_path_dict, pattern = f'mean_scores_test_samplesize'): # Initialize a dictionary to store paths paths_dict = {} # Populate paths_dict with model names and corresponding file paths for model_name, path in results_path_dict.items(): print(model_name, path) paths_dict[model_name] = glob_like(path, pattern) # Initialize an empty DataFrame to store all the data df_all_paths = pd.DataFrame() # Iterate over the dictionary items for model_name, files in paths_dict.items(): # Extract the sample size from each file path sample_size = [file.split("samplesize-")[-1].split(".csv")[0] for file in files] # Create a DataFrame for the current batch of files df = pd.DataFrame({"sample_size": sample_size, "path": files}) df['model_name'] = model_name # Add a column for the model name # Append the current DataFrame to the aggregated DataFrame df_all_paths = df_all_paths.append(df, ignore_index=True) # Ensure to reassign and use ignore_index=True # Now df_all_paths contains all the data combined from the different files and models return df_all_paths def read_and_aggregate_scores(df_all_paths): """ Reads CSV files from the paths listed in the provided DataFrame and aggregates the data into a single DataFrame. Parameters: df_all_paths (pd.DataFrame): DataFrame containing columns 'sample_size', 'path', and 'model_name'. Returns: pd.DataFrame: A DataFrame containing all the data from the CSV files specified in the 'path' column. """ # Initialize an empty DataFrame to store all scores df_all_paths_allscores = pd.DataFrame() # Iterate over each row in the DataFrame for index, row in df_all_paths.iterrows(): print(row) # Read the CSV file df = pd.read_csv(row['path'], header=[0, 1],index_col=0) # Rename 'Unnamed: 7_level_1' and 'Unnamed: 8_level_1' columns to empty strings df.rename(columns=lambda x: '' if 'Unnamed' in x else x, level=1, inplace=True) # Reset the index to avoid issues when appending df.reset_index(inplace=True) # Add additional information from the original DataFrame df['sample_size'] = row['sample_size'] # df['model_name'] = row['model_name'] # Append the current DataFrame to the aggregated DataFrame df_all_paths_allscores = df_all_paths_allscores.append(df, ignore_index=True) df_all_paths_allscores["latent_name"] = [s.split("_latent_")[0] for s in df_all_paths_allscores["dataset_type"]] return df_all_paths_allscores def filter_min_max_silhouette_scores(df,batch_col='batch'): """ Filters rows with the minimum and maximum silhouette scores for each dataset type, considering the column MultiIndex structure. Parameters: df (pd.DataFrame): DataFrame containing all scores with columns including 'silhouette' under 'batch' and 'celltype' levels, as well as other relevant columns. batch_col (str): String with name of the columns that containes the batch scores. Default:'batch' Returns: tuple: A tuple containing two DataFrames: one for minimum silhouette scores and one for maximum silhouette scores. """ # Initialize DataFrames for storing the min and max silhouette scores df_min_silhouette = pd.DataFrame(columns=df.columns) df_max_silhouette = pd.DataFrame(columns=df.columns) # Iterate over each unique dataset type for dataset_type in df[('dataset_type', '')].unique(): # Filter the DataFrame for the current dataset type df_filtered = df[df[('dataset_type', '')] == dataset_type] if not df_filtered.empty: # Find the row with the minimum silhouette score min_row = df_filtered.loc[df_filtered[(batch_col, 'silhouette')].idxmin()] df_min_silhouette = df_min_silhouette.append(min_row, ignore_index=True) # Find the row with the maximum silhouette score max_row = df_filtered.loc[df_filtered[(batch_col, 'silhouette')].idxmax()] df_max_silhouette = df_max_silhouette.append(max_row, ignore_index=True) return df_min_silhouette, df_max_silhouette def process_all_results(df_all_paths, models2process_dict, out_name, dataset_type): """ Process and merge results from different models and sample sizes into a single DataFrame, and save to CSV. Parameters: df_all_paths (pd.DataFrame): DataFrame containing columns 'sample_size', 'model_name', and 'path'. models2process_dict (dict): Dictionary indicating the processing type for each model. out_name (str): Directory path where the processed results will be saved. dataset_type (str): Type of dataset being processed. Returns: dict: Dictionary with sample sizes as keys and their corresponding processed DataFrames as values. """ df_sample_size = {} print(df_all_paths) # Loop over each unique sample size for sample_size in np.unique(df_all_paths["sample_size"]): print("sample size", sample_size) model_path_dict = {} # DataFrames for single model format and PCA format df_smf = pd.DataFrame() df_pcaf = pd.DataFrame() count_i = 0 for model_name in np.unique(df_all_paths["model_name"]): # Get the results file path print(model_name) file_paths = df_all_paths.loc[(df_all_paths["sample_size"] == sample_size) & (df_all_paths["model_name"] == model_name), "path"].values if len(file_paths) > 0: file_path = file_paths[0] print(model_name, "file path", file_path) model_path_dict[model_name] = file_path # Process models without PCA results if models2process_dict[model_name] == "process_single_model_format": df_smf = df_smf.append(process_single_model_format(file_path=model_path_dict[model_name], model_name=model_name)) print("smf", df_smf) # Process models with PCA results elif models2process_dict[model_name] == "preprocess_results_model_pca_format": df_pcaf_i = preprocess_results_model_pca_format(model_path_dict[model_name], columns_to_drop=['fold', 'sem_fold']) df_pcaf_i = df_pcaf_i.rename(columns={"X_pca_val": "X_pca_val_" + str(count_i)}) df_pcaf = pd.concat([df_pcaf, df_pcaf_i], axis=1) count_i += 1 else: print(model_name, "not processed") # Merge PCA format and single model format DataFrames df_all = pd.concat([df_pcaf, df_smf.T], axis=1).T # Save the DataFrame to CSV df_all.to_csv(os.path.join(out_name, f"{dataset_type}_scores_{sample_size}.csv")) print(df_all) df_sample_size[sample_size] = df_all return df_sample_size def process_confidence_intervals(df_all, out_name, dataset_type, sample_size, dof=4): """ Calculate 95% CI for the DataFrame, intercalate mean and CI columns, and save to a CSV file. Parameters: df_all (pd.DataFrame): DataFrame containing all scores with columns including 'mean' and 'CI' values. out_name (str): Directory path where the processed results will be saved. dataset_type (str): Type of dataset being processed. sample_size (int): Sample size for the current processing batch. dof (int): Degrees of freedom for calculating the CI. Default is 4 (n=5, dof=n-1=4). Returns: pd.DataFrame: DataFrame with intercalated mean and CI columns. """ # Calculate and append 95% CI df_ci = calculate_and_append_ci(df_all, dof=dof) # Extract only mean and CI columns mean_ci_columns = [col for col in df_ci.columns if 'mean' in col[2] or 'CI' in col[2]] df_mean_ci = df_ci[mean_ci_columns] # Intercalate the mean and CI columns intercalated_columns = [] for level_0 in df_ci.columns.levels[0]: for level_1 in df_ci.columns.levels[1]: mean_col = (level_0, level_1, 'mean') ci_lower_col = (level_0, level_1, 'CI_lower') ci_upper_col = (level_0, level_1, 'CI_upper') if mean_col in df_mean_ci.columns: intercalated_columns.append(mean_col) if ci_lower_col in df_mean_ci.columns: intercalated_columns.append(ci_lower_col) if ci_upper_col in df_mean_ci.columns: intercalated_columns.append(ci_upper_col) # Reorder the DataFrame columns based on intercalated columns df_mean_ci = df_mean_ci[intercalated_columns] # Save the DataFrame with 95% CI to a CSV file output_path = os.path.join(out_name, f"{dataset_type}_scores_{sample_size}_95CI.csv") df_mean_ci.to_csv(output_path) return df_mean_ci def preprocess_results_model_pca_format(file_path, columns_to_drop): """ Load data from a CSV file, clean and transpose the DataFrame. This is for a experiment results format where get_pca=True Parameters: - file_path: str, the path to the CSV file. - columns_to_drop: list of str, the names of columns to drop from level 1 of the MultiIndex. Returns: - pd.DataFrame, the transposed and cleaned DataFrame. """ print("reading file:",file_path) # Load the CSV file into a DataFrame with specified MultiIndex for rows and columns df = pd.read_csv(file_path, header=[0, 1, 2], index_col=[0]) # Drop specified columns from level 1 of the MultiIndex in the DataFrame for column in columns_to_drop: df = df.drop(labels=column, axis=1, level=1) # Remove 'sem_' prefix from levels 1 and 2 of the column names new_columns = [(level0, level1.replace('sem_', ''), level2.replace('sem_', '')) for level0, level1, level2 in df.columns] df.columns = pd.MultiIndex.from_tuples(new_columns) # Transpose the DataFrame and adjust the MultiIndex accordingly df_transposed = df.T new_index = [(level1, level2, level0) for level0, level1, level2 in df_transposed.index] df_transposed.index = pd.MultiIndex.from_tuples(new_index) return df_transposed def process_single_model_format(file_path,model_name): """ Process the dataframe to: 1. Remove columns where the category level is 'fold'. 2. Convert the dataframe back and forth to a dictionary to reformat into multi-level columns. """ # Assuming 'fold' is at a specific level, let's say level 1 for category in a MultiIndex column #if 'fold' in [col[1] for col in df.columns]: # Check if 'fold' exists in the second position of any column tuple #df = df.drop(columns=[col for col in df.columns if col[1] == 'fold'], level=1) # Convert DataFrame to dictionary and back to DataFrame with multi-level columns print("reading file:",file_path) df = pd.read_csv(file_path, header=[0],index_col=[0,1]).T #df = df1.copy() del df['fold'] data = df.to_dict() # Create a DataFrame with multi-level columns columns = pd.MultiIndex.from_tuples([(key[0], key[1], stat) for key in data for stat in data[key]])#, names=['Category', 'Metric', 'Statistic']) values = [data[key][stat] for key in data for stat in data[key]] # Recreate DataFrame with multi-level columns new_df = pd.DataFrame([values], columns=columns) new_df["dataset_type"]=model_name new_df.set_index("dataset_type",inplace=True) return new_df def calculate_and_append_ci(df, dof): import scipy.stats as stats """ Calculates 95% confidence intervals and adds it to df Args: df with clustering scores and 3 level columns: 1st level: batch,celltype, second level: 1/db, ch, silhouette, third level: mean, sem, std dof(int): degrees of freedom for confidence intervals Return: df """ # Function to calculate the 95% confidence interval def get_CI(df, dof, mean_col, sem_col): # Critical t-value for 95% confidence interval t_critical = stats.t.ppf(1 - 0.025, dof) # Calculate the margin of error df['margin_of_error'] = df[sem_col] * t_critical # Calculate the confidence intervals df[(mean_col[0], mean_col[1], 'CI_lower')] = df[mean_col] - df['margin_of_error'] df[(mean_col[0], mean_col[1], 'CI_upper')] = df[mean_col] + df['margin_of_error'] return df[[(mean_col[0], mean_col[1], 'CI_lower'), (mean_col[0], mean_col[1], 'CI_upper')]] # Prepare to store CI columns ci_columns = [] # Iterate through each level of the MultiIndex to calculate confidence intervals for level_0 in df.columns.levels[0]: for level_1 in df.columns.levels[1]: try: # Filter the DataFrame for the given index level combination mean_col = (level_0, level_1, 'mean') sem_col = (level_0, level_1, 'sem') if mean_col in df.columns and sem_col in df.columns: df_subset = df[[mean_col, sem_col]] ci_df = get_CI(df_subset.copy(), dof, mean_col, sem_col) ci_columns.append(ci_df) except KeyError: # Skip if the combination of levels does not exist in the data continue # Concatenate the original DataFrame with the new CI columns ci_df = pd.concat(ci_columns, axis=1) final_df = pd.concat([df, ci_df], axis=1) return final_df def filter_adata_by_batch(adata_input, n_batches_sample, batch_col="batch"): """ Filter an AnnData object by a list of batch values. Parameters: adata_input (anndata.AnnData): The input AnnData object. n_batches_sample (list): A list of batch values to filter by. batch_col (str): The column name in obs to filter by. Returns: anndata.AnnData: A new AnnData object with filtered obs and X. """ # Filter the obs DataFrame based on the specified batch values filtered_obs = adata_input.obs[adata_input.obs[batch_col].isin(n_batches_sample)] # Get the indices of the filtered rows filtered_indices = filtered_obs.index.astype(int) # Filter the X matrix using the filtered indices filtered_X = adata_input.X[filtered_indices, :] # Create a new AnnData object with the filtered data filtered_adata = AnnData(X=filtered_X, obs=filtered_obs, var=adata_input.var) return filtered_adata def get_umap_plot(df, umap_path, plot_params, sample_size=10000, n_neighbors=15, scaling="min_max", n_batches_sample=20, batch_col="batch", issparse=True): """ Reads a DataFrame with input paths, loads input and latent spaces, applies scaling, computes UMAP, and plots the results. Parameters: df : pandas.DataFrame DataFrame containing the input and latent paths along with prefixes. umap_path : str Path where UMAP plots and data will be saved. plot_params : dict Dictionary of parameters to customize the plot appearance. sample_size : int, optional Number of samples to subset for UMAP computation, default is 10,000. n_neighbors : int, optional Number of neighbors to use in UMAP computation, default is 15. scaling : str, optional Type of scaling to apply to the data, either "min_max" or "z_scores", default is "min_max". n_batches_sample : int, optional Number of batches to sample for plotting, default is 20. batch_col : str, optional Column name in obs to filter by, default is "batch". issparse: bool, optional Flag if the inputs are sparse to load tehm and convert them to numpy array. Returns: None The function performs the following steps: 1. Iterates through unique input prefixes in the DataFrame. 2. Reads input data and applies scaling based on the provided scaling parameter. 3. Subsets the data to a specified sample size. 4. Computes UMAP for the input data and saves the results. 5. Iterates through latent data paths corresponding to each input prefix. 6. Computes UMAP for each latent data and saves the results. 7. Plots and saves the UMAP representations. Notes: - The `read_adata`, `min_max_scaling`, `calculate_zscores`, and `plot_rep` functions are defined in utils. - UMAP results are saved in the specified `umap_path` directory. """ print("\nComputing UMAP for the following files:") # Get unique input prefixes input_prefixes = np.unique(df["input_prefix"]) batches_sample = [] # Set the random seed for reproducibility np.random.seed(42) for i, input_prefix in enumerate(input_prefixes): # Get the path for the current input prefix input_path = df.loc[df["input_prefix"] == input_prefix, "InputsPath"].values[0] print(input_prefix, input_path) # Read input data X, var, obs = read_adata(input_path, issparse = issparse) if issparse: X = X.toarray() # Apply scaling to X based on the scaling parameter if scaling == "min_max": X = min_max_scaling(X) elif scaling == "z_scores": X = calculate_zscores(X) # Subset obs to a random sample size if sample_size is not None: sampled_indices = np.random.choice(X.shape[0], sample_size, replace=False) X = X[sampled_indices, :] obs = obs.iloc[sampled_indices].reset_index(drop=True) # Compute UMAP for the input data adata_input = AnnData(X, obs=obs, var=var) sc.pp.neighbors(adata_input, n_neighbors=n_neighbors) sc.tl.umap(adata_input) # Save UMAP results to CSV umap_df = pd.DataFrame(adata_input.obsm["X_umap"], columns=["UMAP1", "UMAP2"]) umap_df = pd.concat([umap_df, obs], axis=1) umap_df.to_csv(os.path.join(umap_path, f"umap_input_{input_prefix}_{sample_size}cells.csv"), index=False) # Plot UMAP results for input data plot_rep(adata_input, outpath=umap_path, file_name=f"input_{input_prefix}_{sample_size}cells", **plot_params) # Plot batch subsample # If first iteration, determine batches to sample if i == 0 and n_batches_sample is not None: batches = np.unique(obs[batch_col]) batches = batches.tolist() batches_sample = np.random.choice(batches, size=n_batches_sample, replace=False) print("batches sample for plotting",batches_sample) # Modify plot parameters to plot by batch plot_params_batch_col = plot_params.copy() plot_params_batch_col.update({ "shape_col": batch_col, "color_col": batch_col }) if n_batches_sample is not None: # Filter data by batches and calculate UMAP adata_input = filter_adata_by_batch(adata_input, batches_sample, batch_col=batch_col) sc.pp.neighbors(adata_input, use_rep="X", n_neighbors=n_neighbors) sc.tl.umap(adata_input) # Save UMAP results for batch-filtered data to CSV umap_df = pd.DataFrame(adata_input.obsm["X_umap"], columns=["UMAP1", "UMAP2"]) umap_df = pd.concat([umap_df, adata_input.obs.reset_index(drop=True)], axis=1) umap_df.to_csv(os.path.join(umap_path, f"umap_input_{sample_size}cells_{n_batches_sample}{batch_col}.csv"), index=False) # Plot UMAP results for batch-filtered data plot_rep(adata_input, outpath=umap_path, file_name=f"input_{input_prefix}_{sample_size}cells_{n_batches_sample}{batch_col}_biocolors", **plot_params) plot_rep(adata_input, outpath=umap_path, file_name=f"input_{input_prefix}_{sample_size}cells_{n_batches_sample}{batch_col}_batchcolors", **plot_params_batch_col) # Get latent prefixes and paths for the current input prefix latent_prefixes = df.loc[df["input_prefix"] == input_prefix, "latent_prefix"].values latent_paths = df.loc[df["input_prefix"] == input_prefix, "LatentPath"].values for latent_pref, latent_path in zip(latent_prefixes, latent_paths): print(latent_pref, latent_path) # Load latent data X = np.load(latent_path) if sample_size is not None: X = X[sampled_indices, :] adata = AnnData(X, obs=obs) # Calculate UMAP for latent data sc.pp.neighbors(adata, use_rep="X", n_neighbors=n_neighbors) sc.tl.umap(adata) # Save UMAP results to CSV umap_df = pd.DataFrame(adata.obsm["X_umap"], columns=["UMAP1", "UMAP2"]) umap_df = pd.concat([umap_df, obs], axis=1) umap_df.to_csv(os.path.join(umap_path, f"umap_{latent_pref}_{sample_size}cells.csv"), index=False) # Plot UMAP results for latent data plot_rep(adata, outpath=umap_path, file_name=f"{latent_pref}_{sample_size}cells", **plot_params) try: plot_rep(adata, outpath=umap_path, file_name=f"{latent_pref}_{sample_size}cells_{batch_col}", **plot_params_batch_col) except Exception as e: print("Check the error but probably there are too many batches") print(e) if n_batches_sample is not None: # Filter data by batches and calculate UMAP adata = filter_adata_by_batch(adata, batches_sample, batch_col=batch_col) sc.pp.neighbors(adata, use_rep="X", n_neighbors=n_neighbors) sc.tl.umap(adata) # Save UMAP results for batch-filtered data to CSV umap_df = pd.DataFrame(adata.obsm["X_umap"], columns=["UMAP1", "UMAP2"]) umap_df = pd.concat([umap_df, adata.obs.reset_index(drop=True)], axis=1) umap_df.to_csv(os.path.join(umap_path, f"umap_{latent_pref}_{sample_size}cells_{n_batches_sample}{batch_col}.csv"), index=False) # Plot UMAP results for batch-filtered data plot_rep(adata, outpath=umap_path, file_name=f"{latent_pref}_{sample_size}cells_{n_batches_sample}{batch_col}_celltypecolors", **plot_params) plot_rep(adata, outpath=umap_path, file_name=f"{latent_pref}_{sample_size}cells_{n_batches_sample}{batch_col}_batchcolors", **plot_params_batch_col) gc.collect() class DimensionalityReductionProcessor: def __init__(self, df, output_path, plot_params, sample_size=10000, n_neighbors=15, scaling="min_max", n_batches_sample=20, batch_col="batch", plot_tsne=False, n_pca_components=50,min_dist=0.5): self.df = df self.output_path = output_path self.plot_params = plot_params self.sample_size = sample_size self.n_neighbors = n_neighbors self.min_dist = min_dist self.scaling = scaling self.n_batches_sample = n_batches_sample self.batch_col = batch_col self.plot_tsne = plot_tsne self.n_pca_components = n_pca_components self.batches_sample = None def apply_scaling(self, X): if self.scaling == "min_max": return min_max_scaling(X) elif self.scaling == "z_scores": return calculate_zscores(X) return X def subset_data(self, X, obs): if self.sample_size is not None: sampled_indices = np.random.choice(X.shape[0], self.sample_size, replace=False) X = X[sampled_indices, :] obs = obs.iloc[sampled_indices].reset_index(drop=True) self.sampled_indices = sampled_indices return X, obs def compute_pca(self, X): pca = PCA(n_components=self.n_pca_components) X_pca = pca.fit_transform(X) return X_pca def compute_and_save_umap(self, X, obs, prefix,name = None): use_rep = "X_pca" if 'input' in prefix else "X" adata = AnnData(X, obs=obs) if use_rep == "X_pca": X = self.compute_pca(X) adata.obsm["X_pca"] = X sc.pp.neighbors(adata, n_neighbors=self.n_neighbors, use_rep=use_rep) sc.tl.umap(adata, min_dist = self.min_dist) self.save_results(adata, prefix, "umap",name=name) def compute_and_save_tsne(self, X, obs, prefix,name=None): use_rep = "X_pca" if 'input' in prefix else "X" if use_rep == "X_pca": X = self.compute_pca(X) tsne = TSNE(n_components=2) X_tsne = tsne.fit_transform(X) adata = AnnData(X_tsne, obs=obs) adata.obsm["X_tsne"] = X_tsne self.save_results(adata, prefix, "tsne",name=name) def save_results(self, adata, prefix, method,name=None): use_rep = f"X_{method}" # Update plot params plot_params = self.plot_params.copy() plot_params.update({"use_rep":use_rep}) plot_params_batch = plot_params.copy() plot_params_batch.update({"shape_col": self.batch_col, "color_col": self.batch_col}) # Define base name file_name_base = f"{prefix}_{self.sample_size}cells" if name is not None: file_name_base = f"{file_name_base}_{name}" # Save lat_rep results_df = pd.DataFrame(adata.obsm[f"X_{method}"], columns=[f"{method.upper()}1", f"{method.upper()}2"]) results_df = pd.concat([results_df, adata.obs], axis=1) results_df.to_csv(os.path.join(self.output_path, f"{method}_{file_name_base}.csv"), index=False) try: plot_rep(adata, outpath=self.output_path, file_name=file_name_base, **plot_params) plot_rep(adata, outpath=self.output_path, file_name=f"{file_name_base}_batch" , **plot_params_batch) # Just plot the latent rep if method == 'umap': print("\n Plotting latent rep") if 'input' in prefix: plot_params.update({"use_rep":"X_pca"}) plot_rep(adata, outpath=self.output_path, file_name=f"modellatent_{file_name_base}", **plot_params) plot_params_batch.update({"use_rep":"X_pca"}) plot_rep(adata, outpath=self.output_path, file_name=f"modellatent_{file_name_base}_batch", **plot_params_batch) else: plot_params.update({"use_rep":"X"}) plot_rep(adata, outpath=self.output_path, file_name=f"modellatent_{file_name_base}", **plot_params) plot_params_batch.update({"use_rep":"X"}) plot_rep(adata, outpath=self.output_path, file_name=f"modellatent_{file_name_base}_batch", **plot_params_batch) except Exception as e: print("Check the error but probably there are too many batches") print(e) def filter_and_save_batch_data(self, adata, prefix): use_rep = "X_pca" if 'input' in prefix else "X" adata = filter_adata_by_batch(adata, self.batches_sample, batch_col=self.batch_col) # if use_rep == "X_pca": # X = self.compute_pca(adata.X) # adata.obsm["X_pca"] = X # sc.pp.neighbors(adata, use_rep=use_rep, n_neighbors=self.n_neighbors) # sc.tl.umap(adata) self.compute_and_save_umap(adata.X, adata.obs, prefix,name =f"{self.n_batches_sample}{self.batch_col}") # self.save_results(adata, f"{prefix}_{self.sample_size}cells_{self.n_batches_sample}{self.batch_col}", "umap") if self.plot_tsne: self.compute_and_save_tsne(adata.X, adata.obs, prefix,name =f"{self.n_batches_sample}{self.batch_col}") # self.save_results(adata, f"{prefix}_{self.sample_size}cells_{self.n_batches_sample}{self.batch_col}", "tsne") def select_batches(self, obs): batches = np.unique(obs[self.batch_col]) batches_sample = np.random.choice(batches, size=self.n_batches_sample, replace=False) self.batches_sample = batches_sample return batches_sample def process_latent_data(self, input_prefix, obs, process_allbatches): latent_prefixes = self.df.loc[self.df["input_prefix"] == input_prefix, "latent_prefix"].values latent_paths = self.df.loc[self.df["input_prefix"] == input_prefix, "LatentPath"].values for latent_pref, latent_path in zip(latent_prefixes, latent_paths): print(latent_pref, latent_path) X = np.load(latent_path) if self.sample_size is not None: X = X[self.sampled_indices, :] adata = AnnData(X, obs=obs) use_rep = "X_pca" if 'input' in latent_pref else "X" if process_allbatches: self.compute_and_save_umap(X, obs, latent_pref) if self.plot_tsne: self.compute_and_save_tsne(X, obs, latent_pref) self.filter_and_save_batch_data(adata, latent_pref) def get_dimensionality_reduction_plots(self, process_allbatches=True, seed=42,issparse=True): print("\nComputing UMAP and t-SNE for the following files:") input_prefixes = np.unique(self.df["input_prefix"]) batches_sample = [] np.random.seed(seed) for i, input_prefix in enumerate(input_prefixes): input_path = self.df.loc[self.df["input_prefix"] == input_prefix, "InputsPath"].values[0] print(input_prefix, input_path) X, var, obs = read_adata(input_path, issparse=issparse) if issparse: X = X.toarray() X = self.apply_scaling(X) X, obs = self.subset_data(X, obs) adata = AnnData(X, obs=obs) # use_rep = "X_pca" if 'input' in input_prefix else "X" if process_allbatches: self.compute_and_save_umap(X, obs, "input_"+input_prefix) if self.plot_tsne: self.compute_and_save_tsne(X, obs, "input_"+input_prefix) if i == 0: batches_sample = self.select_batches(obs) self.filter_and_save_batch_data(adata, "input_"+input_prefix) self.process_latent_data(input_prefix, obs, process_allbatches) gc.collect()