import time
import numpy as np
import pandas as pd
import os
from sklearn.metrics import davies_bouldin_score, calinski_harabasz_score
#from tensorflow.keras.callbacks import LambdaCallback
from tensorflow.keras.callbacks import Callback
def get_clustering_scores(X, labels, sample_size=None):
""" Computes and returns clustering scores more efficiently. """
if sample_size is not None: #and sample_size < X.shape[0]:
indices = np.random.choice(X.shape[0], sample_size, replace=False)
X = X[indices]
labels = labels[indices]
db_score = davies_bouldin_score(X, labels)
ch_score = calinski_harabasz_score(X, labels)
return db_score, 1 / db_score, ch_score
def compute_latents_and_scores(model, X, metadata, labels_list, output_dir, epoch, sample_size=None,batch_size=32,model_type="ae_da"):
"""
Computes latents and clustering scores, managing file I/O and data processing efficiently.
Parameters:
- model: The machine learning model to predict from.
- X: Input features for prediction.
- metadata: DataFrame containing labels for clustering scores.
- labels_list: List of column names from metadata to compute scores for.
- output_dir: Directory path to save outputs.
- epoch: Current epoch number (zero-based index).
- sample_size: Optional; number of samples to use for score computation.
Returns:
- Dictionary of scores data.
"""
def get_input_data(input_data, model_type):
"""Helper function to determine input based on model type and data structure."""
# if tuple x,z = input_data
if isinstance(input_data, tuple):
# for ae_re we use x,z as encoder inputs, else we use x only
return input_data if model_type == "ae_re" else input_data[0]
return input_data
X = get_input_data(X, model_type)
# Ensure the output directory exists
os.makedirs(output_dir, exist_ok=True)
# Get latent space
if model_type == "ae_re":
latent = model.re_encoder.predict(X, batch_size=batch_size)
else:
# print("real use_layer_activations:", model.encoder.return_layer_activations)
# Check if the encoder object has the 'return_layer_activations' attribute and if it is True
use_layer_activations = hasattr(model.encoder, 'return_layer_activations') and model.encoder.return_layer_activations
# print("use_layer_activations:", use_layer_activations)
outputs = model.encoder.predict(X, batch_size=batch_size)
latent = outputs[-1] if use_layer_activations else outputs
# print(latent,"latent")
epoch_str = f'epoch{epoch+1:03d}'
pd.DataFrame(latent).to_pickle(os.path.join(output_dir, f'{epoch_str}_latents.pkl'))
scores_data = {}
for label_col in labels_list:
labels = metadata[label_col]
#get clustering scores on latent space
scores = get_clustering_scores(latent, labels, sample_size)
scores_data.update({f'{label_col}_DB': scores[0], f'{label_col}_1/DB': scores[1], f'{label_col}_CH': scores[2]})
score_path = os.path.join(output_dir, f'clustering_scores_{label_col}.csv')
with open(score_path, 'a') as file:
if epoch == 0: # Check if header needs to be written
file.write('epoch,DB,1/DB,CH\n')
file.write(f'{epoch+1},{scores[0]},{scores[1]},{scores[2]}\n')
return scores_data
class ComputeLatentsCallback(Callback):
def __init__(self, model, X, metadata, labels_list, output_dir, sample_size=None, batch_size=32,model_type="ae_da"):
self.model = model
self.X = X
self.metadata = metadata
self.labels_list = labels_list
self.output_dir = output_dir
self.sample_size = sample_size
self.batch_size = batch_size
self.model_type = model_type
def on_epoch_end(self, epoch, logs=None):
compute_latents_and_scores(self.model, self.X, self.metadata, self.labels_list, self.output_dir, epoch, self.sample_size,self.batch_size,self.model_type)
# Usage
# model_callback = ComputeLatentsCallback(model, X_train, metadata_df, ['batch', 'bio'], '/path/to/output')
# model.fit(X_train, y_train, epochs=10, callbacks=[model_callback])