import scanpy as sc
import h5py
from scipy.sparse import csr_matrix
import anndata
import numpy as np
import pandas as pd
import glob
def scRNAseq_pipeline(adata, min_genes_per_cell=10, min_cells_per_gene=3, total_counts_per_cell=10000, n_top_genes=1000, n_components=50):
# 1. Filter low quality cells and genes
sc.pp.filter_cells(adata, min_genes=min_genes_per_cell)
sc.pp.filter_genes(adata, min_cells=min_cells_per_gene)
# 2. Normalize with total UMI count per cell
sc.pp.normalize_total(adata, target_sum=total_counts_per_cell)
# 3. Logarithmize the data
sc.pp.log1p(adata)
# 4. Select highly variable genes
sc.pp.highly_variable_genes(adata, n_top_genes=n_top_genes)
# Filter the data for highly variable genes
adata = adata[:, adata.var.highly_variable]
# 5. Apply PCA: This solver uses the ARPACK implementation of the truncated singular value decomposition (SVD). It's more suitable for smaller datasets or when you need a small number of components (principal components). It can be slower but is typically more accurate, especially when the number of requested components is much smaller than the number of features.
sc.tl.pca(adata, svd_solver='arpack', n_comps=n_components)
return adata
def plot_pca(adata, sample_id_column='sampleID'):
sc.pl.pca(adata, color=sample_id_column)
# import scanpy as sc
def scRNAseq_pipeline_log(adata, min_genes_per_cell=10, min_cells_per_gene=3, total_counts_per_cell=10000):
# Create a copy of the data to keep the raw data unchanged
adata_copy = adata.copy()
# 1. Filter low quality cells and genes
sc.pp.filter_cells(adata_copy, min_genes=min_genes_per_cell)
sc.pp.filter_genes(adata_copy, min_cells=min_cells_per_gene)
# 2. Normalize with total UMI count per cell
sc.pp.normalize_total(adata_copy, target_sum=total_counts_per_cell)
# 3. Logarithmize the data
sc.pp.log1p(adata_copy)
return adata_copy
def scRNAseq_pipeline_loghvg(adata, min_genes_per_cell=10, min_cells_per_gene=3, total_counts_per_cell=10000,n_top_genes=3000):
# Create a copy of the data to keep the raw data unchanged
adata_copy = adata.copy()
# 1. Filter low quality cells and genes
sc.pp.filter_cells(adata_copy, min_genes=min_genes_per_cell)
sc.pp.filter_genes(adata_copy, min_cells=min_cells_per_gene)
# 2. Normalize with total UMI count per cell
sc.pp.normalize_total(adata_copy, target_sum=total_counts_per_cell)
# 3. Logarithmize the data
sc.pp.log1p(adata_copy)
# 4. Select highly variable genes and subset
sc.pp.highly_variable_genes(adata_copy, n_top_genes=n_top_genes, subset=True)
return adata_copy
def scRNAseq_pipeline_v2(adata, min_genes_per_cell=10, min_cells_per_gene=3, total_counts_per_cell=10000, n_top_genes=1000, n_components=50):
# Create a copy of the data to keep the raw data unchanged
adata_copy = adata.copy()
# 1. Filter low quality cells and genes
sc.pp.filter_cells(adata_copy, min_genes=min_genes_per_cell)
sc.pp.filter_genes(adata_copy, min_cells=min_cells_per_gene)
# 2. Normalize with total UMI count per cell
sc.pp.normalize_total(adata_copy, target_sum=total_counts_per_cell)
# 3. Logarithmize the data
sc.pp.log1p(adata_copy)
# 4. Select highly variable genes and subset
sc.pp.highly_variable_genes(adata_copy, n_top_genes=n_top_genes, subset=True)
# 5. Scale the data
sc.pp.scale(adata_copy)
# 6. Apply PCA
sc.tl.pca(adata_copy, svd_solver='arpack', n_comps=n_components)
# 7. Compute the neighborhood graph
sc.pp.neighbors(adata_copy)
# 8. Run UMAP
sc.tl.umap(adata_copy)
# Return the processed data
return adata_copy
def plot_results(adata_copy, batch_column="BATCH", celltype_column="celltype",file_name ='heart_latent'):
# Plot UMAP with color annotations for batch and cell type
sc.pl.pca(adata_copy,color=[batch_column, celltype_column],save=file_name+'.png')
sc.pl.umap(adata_copy,color=[batch_column, celltype_column],save=file_name+'.png')
# Loader for the Healthy Heart dataset
class H5ADLoader:
"""
Loader class for reading .h5ad files and converting them into AnnData objects.
This loader is useful to load Heart datasets from Yu et al 2023.
Link to Heart dataset downloads: https://figshare.com/articles/dataset/Batch_Alignment_of_single-cell_transcriptomics_data_using_Deep_Metric_Learning/20499630/2?file=38987759
Attributes:
file_path (str): Path to the .h5ad file to be loaded.
adata_dict (dict): Dictionary containing the loaded data from the .h5ad file. Initially set to None.
Methods:
load_h5ad():
Reads the .h5ad file specified by `file_path`, extracting its components into `adata_dict`.
It handles the main data matrix 'X' as a sparse matrix and other components such as 'obs', 'var',
'obsm', 'obsp', 'uns', 'layers', 'varm', 'varp'. For 'obs', it particularly converts categories into
pandas.Categorical objects.
_handle_obs(obs_group):
Helper method to process the 'obs' group from the .h5ad file. It converts categorical data into
pandas.Categorical objects for easier manipulation in pandas DataFrames.
_handle_other_components(adata, file, key):
Helper method to process components other than 'X' and 'obs'. This includes 'var', 'obsm', 'obsp',
'uns', 'layers', 'varm', 'varp'. Data is added to `adata_dict` in an appropriate format (numpy arrays or dictionaries).
create_anndata():
Converts `adata_dict` into an AnnData object. This method should be called after `load_h5ad` to ensure
data has been loaded. It handles the creation of the AnnData object by properly assigning 'X', 'obs',
and 'var', and additionally includes 'obsm', 'obsp', 'layers', 'uns' if they are present in `adata_dict`.
Usage:
loader = H5ADLoader(file_path)
loader.load_h5ad() # Loads data from .h5ad file
adata = loader.create_anndata() # Converts loaded data into AnnData object
"""
def __init__(self, file_path):
self.file_path = file_path
self.adata_dict = None
def load_h5ad(self):
with h5py.File(self.file_path, "r") as f:
adata = {}
if {'data', 'indices', 'indptr'}.issubset(f['X']):
print("Reading X")
data = f['X']['data'][:]
indices = f['X']['indices'][:]
indptr = f['X']['indptr'][:]
shape = f['X'].attrs['shape']
# Check if matrix is transposed based on indptr length
transposed = len(indptr) != shape[0] + 1
correct_shape = (shape[1], shape[0]) if transposed else shape
try:
X = csr_matrix((data, indices, indptr), shape=correct_shape)
if transposed:
adata['X'] = X.T
else:
adata['X']=X
#adata['transposed'] = transposed
print(f"CSR matrix created successfully with shape: {correct_shape}, Transposed: {transposed}")
except ValueError as e:
print(f"Error creating CSR matrix: {e}")
else:
print("Required keys ('data', 'indices', 'indptr') not found in 'X'.")
for key in f.keys():
print("Reading:",key)
if key !='X':
if key == 'obs':
obs_data = self._handle_obs(f[key])
adata['obs'] = pd.DataFrame(obs_data)
else:
self._handle_other_components(adata, f, key)
# Convert 'var' to DataFrame
if 'var' in adata:
adata['var'] = pd.DataFrame(adata['var'])
self.adata_dict = adata
def _handle_obs(self, obs_group):
obs_data = {}
for subkey in obs_group:
item = obs_group[subkey]
if isinstance(item, h5py.Group):
categories_dataset = item['categories']
codes_dataset = item['codes']
categories = categories_dataset[:]
codes = codes_dataset[:]
obs_data[subkey] = pd.Categorical.from_codes(codes, categories.astype('U'))
else:
obs_data[subkey] = item[:]
return obs_data
def _handle_other_components(self, adata, file, key):
if isinstance(file[key], h5py.Group):
adata[key] = {subkey: np.array(file[key][subkey]) for subkey in file[key]}
if not adata[key]: # Check if the dictionary is empty
print(f"No data found under key '{key}'")
else:
adata[key] = np.array(file[key])
if adata[key].size == 0: # Check if the array is empty
print(f"No data found under key '{key}'")
def create_anndata(self):
if self.adata_dict is None:
print("Data not loaded. Please load the .h5ad file first.")
return None
# Extract data from the dictionary
X = self.adata_dict.get('X', None)
obs = self.adata_dict.get('obs', None)
var = self.adata_dict.get('var', None)
# Create AnnData object
ann_data = anndata.AnnData(X=X, obs=obs, var=var)
# Add other components if they exist
for key in ['obsm', 'obsp', 'layers', 'uns']:
if key in self.adata_dict:
setattr(ann_data, key, self.adata_dict[key])
return ann_data
# Loader for AML dataset
class AML_data_reader:
"""Class to read adata from vanGalen et al 2019. Downloaded from Gene Expression Obnibus with acession number GSE116256. A count matrix for each individual is stored separately in a zip file"""
def __init__(self, parent_path):
self.parent_path = parent_path
def extract_accession(self,annotation):
id = annotation.split("/")[-1].split('-')[0].split('_')[0]
return id
def extract_id(self,annotation):
id = annotation.split("/")[-1].split('-')[0].split('_')[1].split('.')[0]
return id
def extract_file_note(self, annotation):
ID = self.extract_id(annotation)
day_part = annotation.split("D")[-1].split('.')[0]
note = annotation.split("BM")[-1].split('.')[0]
return_ = np.nan
if 'AML' in ID:
return_ = 'D' + day_part
elif 'BM' in ID:
return_ = 'BM' + note
return return_
def add_Day_column(self,df_paths):
# PREREQUISITE: Have file_note col
result_list = []
for d in df_paths['file_note']:
if isinstance(d, str) and 'D' in d:
result_list.append(d) # Keep the string if it contains 'D'
else:
result_list.append(np.nan) # Replace non-matching entries with np.nan
df_paths['Day'] = result_list
return df_paths
def add_real_id_column(self, df_paths):
df_paths["unique_id"] = df_paths["id"].copy()
df_paths["Patient_group"] = np.nan
for i in range(len(df_paths)):
day = df_paths.loc[i, 'Day']
id = df_paths.loc[i, 'id']
if "AML" in id:
df_paths.loc[i, "Patient_group"] = "AML"
elif "BM" in id:
df_paths.loc[i, "Patient_group"] = "control"
elif id in ['MUTZ3', 'OCI']:
df_paths.loc[i, "Patient_group"] = "cellline"
if not pd.isna(day):
df_paths.at[i, "unique_id"] = id + "_" + str(day)
elif id == "BM5":
df_paths.at[i, "unique_id"] = df_paths.loc[i, 'file_note']
return df_paths
def get_df_paths(self):
anno_list = glob.glob(self.parent_path + '/*anno*')
matrix_list = glob.glob(self.parent_path + '/*dem*')
anno_df = pd.DataFrame({'anno_path': anno_list,
'id': [self.extract_id(m) for m in anno_list],
'file_note': [self.extract_file_note(m) for m in anno_list],
'accession_anno_num': [self.extract_accession(m) for m in anno_list]})
matrix_df = pd.DataFrame({'matrix_path': matrix_list,
'id': [self.extract_id(m) for m in matrix_list],
'file_note': [self.extract_file_note(m) for m in matrix_list],
'accession_matrix_num': [self.extract_accession(m) for m in matrix_list]})
df_paths = pd.merge(matrix_df, anno_df, on=["id", "file_note"], how="outer")
df_paths = self.add_Day_column(df_paths)
df_paths = self.add_real_id_column(df_paths)
# count the occurrences id, Patient_group
grouped_counts = df_paths.groupby(['id', 'Patient_group']).size().reset_index(name='counts')
print(grouped_counts)
return df_paths
#return anno_df, matrix_df
def create_adata_dict(self, df_paths):
adata_dict = {}
for i in range(len(df_paths)):
matrix_path_i = df_paths.loc[i, 'matrix_path']
cm = pd.read_csv(matrix_path_i, sep='\t', index_col=0).T
anno_path_i = df_paths.loc[i, 'anno_path']
meta = pd.read_csv(anno_path_i, sep='\t', index_col=0).reset_index()
for col in ['id', 'Day', 'unique_id', 'Patient_group']:
meta[col] = df_paths.loc[i, col]
unique_id = df_paths.loc[i, 'unique_id']
if meta.shape[0] == cm.shape[0]:
adata = anndata.AnnData(X=cm.values, obs=meta, var=pd.DataFrame(cm.columns))
adata_dict[unique_id] = adata
return adata_dict
def merge_adata_objects(self, adata_dict):
equal_genes = True
key_0 = next(iter(adata_dict.keys()))
for key in adata_dict:
if key != key_0 and not adata_dict[key_0].var.equals(adata_dict[key].var):
equal_genes = False
print(f"{key} genes are different")
break
if not equal_genes:
print("Check that your genes are equal for all individual count matrices")
return None
all_obs = pd.concat([adata.obs for adata in adata_dict.values()], ignore_index=True)
all_X = np.concatenate([adata.X for adata in adata_dict.values()], axis=0)
all_var = next(iter(adata_dict.values())).var
return anndata.AnnData(X=all_X, obs=all_obs, var=all_var)