import scanpy as sc
def scRNAseq_pipeline(adata, min_genes_per_cell=10, min_cells_per_gene=3, total_counts_per_cell=10000, n_top_genes=1000, n_components=50):
# 1. Filter low quality cells and genes
sc.pp.filter_cells(adata, min_genes=min_genes_per_cell)
sc.pp.filter_genes(adata, min_cells=min_cells_per_gene)
# 2. Normalize with total UMI count per cell
sc.pp.normalize_total(adata, target_sum=total_counts_per_cell)
# 3. Logarithmize the data
sc.pp.log1p(adata)
# 4. Select highly variable genes
sc.pp.highly_variable_genes(adata, n_top_genes=n_top_genes)
# Filter the data for highly variable genes
adata = adata[:, adata.var.highly_variable]
# 5. Apply PCA: This solver uses the ARPACK implementation of the truncated singular value decomposition (SVD). It's more suitable for smaller datasets or when you need a small number of components (principal components). It can be slower but is typically more accurate, especially when the number of requested components is much smaller than the number of features.
sc.tl.pca(adata, svd_solver='arpack', n_comps=n_components)
return adata
def plot_pca(adata, sample_id_column='sampleID'):
sc.pl.pca(adata, color=sample_id_column)
# import scanpy as sc
def scRNAseq_pipeline_log(adata, min_genes_per_cell=10, min_cells_per_gene=3, total_counts_per_cell=10000):
# Create a copy of the data to keep the raw data unchanged
adata_copy = adata.copy()
# 1. Filter low quality cells and genes
sc.pp.filter_cells(adata_copy, min_genes=min_genes_per_cell)
sc.pp.filter_genes(adata_copy, min_cells=min_cells_per_gene)
# 2. Normalize with total UMI count per cell
sc.pp.normalize_total(adata_copy, target_sum=total_counts_per_cell)
# 3. Logarithmize the data
sc.pp.log1p(adata_copy)
return adata_copy
def scRNAseq_pipeline_loghvg(adata, min_genes_per_cell=10, min_cells_per_gene=3, total_counts_per_cell=10000,n_top_genes=3000):
# Create a copy of the data to keep the raw data unchanged
adata_copy = adata.copy()
# 1. Filter low quality cells and genes
sc.pp.filter_cells(adata_copy, min_genes=min_genes_per_cell)
sc.pp.filter_genes(adata_copy, min_cells=min_cells_per_gene)
# 2. Normalize with total UMI count per cell
sc.pp.normalize_total(adata_copy, target_sum=total_counts_per_cell)
# 3. Logarithmize the data
sc.pp.log1p(adata_copy)
# 4. Select highly variable genes and subset
sc.pp.highly_variable_genes(adata_copy, n_top_genes=n_top_genes, subset=True)
return adata_copy
def scRNAseq_pipeline_v2(adata, min_genes_per_cell=10, min_cells_per_gene=3, total_counts_per_cell=10000, n_top_genes=1000, n_components=50):
# Create a copy of the data to keep the raw data unchanged
adata_copy = adata.copy()
# 1. Filter low quality cells and genes
sc.pp.filter_cells(adata_copy, min_genes=min_genes_per_cell)
sc.pp.filter_genes(adata_copy, min_cells=min_cells_per_gene)
# 2. Normalize with total UMI count per cell
sc.pp.normalize_total(adata_copy, target_sum=total_counts_per_cell)
# 3. Logarithmize the data
sc.pp.log1p(adata_copy)
# 4. Select highly variable genes and subset
sc.pp.highly_variable_genes(adata_copy, n_top_genes=n_top_genes, subset=True)
# 5. Scale the data
sc.pp.scale(adata_copy)
# 6. Apply PCA
sc.tl.pca(adata_copy, svd_solver='arpack', n_comps=n_components)
# 7. Compute the neighborhood graph
sc.pp.neighbors(adata_copy)
# 8. Run UMAP
sc.tl.umap(adata_copy)
# Return the processed data
return adata_copy
def plot_results(adata_copy, batch_column="BATCH", celltype_column="celltype",file_name ='heart_latent'):
# Plot UMAP with color annotations for batch and cell type
sc.pl.pca(adata_copy,color=[batch_column, celltype_column],save=file_name+'.png')
sc.pl.umap(adata_copy,color=[batch_column, celltype_column],save=file_name+'.png')
class H5ADLoader:
"""
Loader class for reading .h5ad files and converting them into AnnData objects.
This loader is useful to load Heart datasets from Yu et al 2023.
Link to Heart dataset downloads: https://figshare.com/articles/dataset/Batch_Alignment_of_single-cell_transcriptomics_data_using_Deep_Metric_Learning/20499630/2?file=38987759
Attributes:
file_path (str): Path to the .h5ad file to be loaded.
adata_dict (dict): Dictionary containing the loaded data from the .h5ad file. Initially set to None.
Methods:
load_h5ad():
Reads the .h5ad file specified by `file_path`, extracting its components into `adata_dict`.
It handles the main data matrix 'X' as a sparse matrix and other components such as 'obs', 'var',
'obsm', 'obsp', 'uns', 'layers', 'varm', 'varp'. For 'obs', it particularly converts categories into
pandas.Categorical objects.
_handle_obs(obs_group):
Helper method to process the 'obs' group from the .h5ad file. It converts categorical data into
pandas.Categorical objects for easier manipulation in pandas DataFrames.
_handle_other_components(adata, file, key):
Helper method to process components other than 'X' and 'obs'. This includes 'var', 'obsm', 'obsp',
'uns', 'layers', 'varm', 'varp'. Data is added to `adata_dict` in an appropriate format (numpy arrays or dictionaries).
create_anndata():
Converts `adata_dict` into an AnnData object. This method should be called after `load_h5ad` to ensure
data has been loaded. It handles the creation of the AnnData object by properly assigning 'X', 'obs',
and 'var', and additionally includes 'obsm', 'obsp', 'layers', 'uns' if they are present in `adata_dict`.
Usage:
loader = H5ADLoader(file_path)
loader.load_h5ad() # Loads data from .h5ad file
adata = loader.create_anndata() # Converts loaded data into AnnData object
"""
def __init__(self, file_path):
self.file_path = file_path
self.adata_dict = None
def load_h5ad(self):
with h5py.File(self.file_path, "r") as f:
adata = {}
if {'data', 'indices', 'indptr'}.issubset(f['X']):
print("Reading X")
data = f['X']['data'][:]
indices = f['X']['indices'][:]
indptr = f['X']['indptr'][:]
shape = f['X'].attrs['shape']
# Check if matrix is transposed based on indptr length
transposed = len(indptr) != shape[0] + 1
correct_shape = (shape[1], shape[0]) if transposed else shape
try:
X = csr_matrix((data, indices, indptr), shape=correct_shape)
if transposed:
adata['X'] = X.T
else:
adata['X']=X
#adata['transposed'] = transposed
print(f"CSR matrix created successfully with shape: {correct_shape}, Transposed: {transposed}")
except ValueError as e:
print(f"Error creating CSR matrix: {e}")
else:
print("Required keys ('data', 'indices', 'indptr') not found in 'X'.")
for key in f.keys():
print("Reading:",key)
if key !='X':
if key == 'obs':
obs_data = self._handle_obs(f[key])
adata['obs'] = pd.DataFrame(obs_data)
else:
self._handle_other_components(adata, f, key)
# Convert 'var' to DataFrame
if 'var' in adata:
adata['var'] = pd.DataFrame(adata['var'])
self.adata_dict = adata
def _handle_obs(self, obs_group):
obs_data = {}
for subkey in obs_group:
item = obs_group[subkey]
if isinstance(item, h5py.Group):
categories_dataset = item['categories']
codes_dataset = item['codes']
categories = categories_dataset[:]
codes = codes_dataset[:]
obs_data[subkey] = pd.Categorical.from_codes(codes, categories.astype('U'))
else:
obs_data[subkey] = item[:]
return obs_data
def _handle_other_components(self, adata, file, key):
if isinstance(file[key], h5py.Group):
adata[key] = {subkey: np.array(file[key][subkey]) for subkey in file[key]}
if not adata[key]: # Check if the dictionary is empty
print(f"No data found under key '{key}'")
else:
adata[key] = np.array(file[key])
if adata[key].size == 0: # Check if the array is empty
print(f"No data found under key '{key}'")
def create_anndata(self):
if self.adata_dict is None:
print("Data not loaded. Please load the .h5ad file first.")
return None
# Extract data from the dictionary
X = self.adata_dict.get('X', None)
obs = self.adata_dict.get('obs', None)
var = self.adata_dict.get('var', None)
# Create AnnData object
ann_data = anndata.AnnData(X=X, obs=obs, var=var)
# Add other components if they exist
for key in ['obsm', 'obsp', 'layers', 'uns']:
if key in self.adata_dict:
setattr(ann_data, key, self.adata_dict[key])
return ann_data