from tensorflow.keras.backend import ctc_batch_cost #from tensorflow.keras.optimizers import Adam from tensorflow.keras.optimizers.legacy import Adam from tensorflow.keras.callbacks import ModelCheckpoint, LearningRateScheduler from preprocessing import * from model import LipReadingModel from datapipeline import * import sys import numpy as np import matplotlib.pyplot as plt import gc from tensorflow.keras import backend as K from tensorflow.keras.layers import Lambda, Input from tensorflow.keras.models import Model #strategy = tf.distribute.MirroredStrategy() char_to_num, num_to_char = vocabulary() #config = tf.compat.v1.ConfigProto() #config.intra_op_parallelism_threads = 4 #config.inter_op_parallelism_threads = 2 #session = tf.compat.v1.Session(config=config) #tf.compat.v1.keras.backend.set_session(session) #physical_devices = tf.config.list_physical_devices('GPU') #tf.config.experimental.set_memory_growth(physical_devices[0], True) #tf.compat.v1.enable_eager_execution() gpus = tf.config.list_physical_devices('GPU') if gpus: try: for gpu in gpus: tf.config.experimental.set_memory_growth(gpu, True) except RuntimeError as e: print(e) import os os.environ['TF_CPP_MIN_LOG_LEVEL'] = '0' # or set to '1' import logging logging.getLogger('tensorflow').setLevel(logging.DEBUG) max_num_frames = 150 #5 seconds IMG_WIDTH = 100 IMG_HEIGHT = 50 #podesiva brzina ucenja ''' def scheduler(epoch, lr): # if epoch < 50: # return lr # else: # cycle = (epoch - 50) // 50 # if cycle % 2 == 0: # return float(lr * tf.math.exp(-0.1)) # else: # return lr remainder = epoch % 15 if remainder == 0: return lr*0.95 else: return lr ''' def scheduler(epoch, lr): # ---- Warmup: epochs 0,1,2 ---- warmup_epochs = 3 target_lr = 2e-4 warmup_start = 1e-5 if epoch < warmup_epochs: # linear warmup from 1e-5 -> 2e-4 over 3 epochs return float( warmup_start + (target_lr - warmup_start) * (epoch / max(1, warmup_epochs - 1)) ) # ---- After warmup: step decay every 8 epochs ---- decay_every = 8 decay_rate = 0.9 # how many decays after warmup decays = (epoch - warmup_epochs) // decay_every new_lr = target_lr * (decay_rate ** decays) return float(new_lr) #sacuvaj history u backup na svakih 5 epoha class SaveHistoryCallback(tf.keras.callbacks.Callback): def __init__(self, save_path, interval=10): super(SaveHistoryCallback, self).__init__() self.save_path = save_path self.interval = interval self.history = {} def on_epoch_end(self, epoch, logs=None): if logs is not None: for key, value in logs.items(): if key not in self.history: self.history[key] = [] self.history[key].append(value) if (epoch + 1) % 5 == 0: self.save_history() def save_history(self): # Save the history as a file save_file = os.path.join(self.save_path, f'history_epoch_{len(self.history["loss"])}.txt') with open(save_file, 'w') as f: for key, values in self.history.items(): f.write(f'{key}: {values}\n') print(f'Saved history at epoch {len(self.history["loss"])}') #predikcija mreze nakon svake epohe class ProduceExample(tf.keras.callbacks.Callback): def __init__(self, train_ds, test_ds, infer_model, num_to_char) -> None: super().__init__() # We keep numpy iterators for both datasets self.train_iter = train_ds.repeat().as_numpy_iterator() self.test_iter = test_ds.repeat().as_numpy_iterator() self.infer_model = infer_model # LipReadingModel, not the CTC wrapper self.num_to_char = num_to_char # StringLookup invert layer def on_epoch_end(self, epoch, logs=None) -> None: print("\nPredicting from test set") batch_test = self.test_iter.next() results_test = self.predict_batch(batch_test) test_dir = os.path.join("train_res", str(epoch).zfill(4), "test") self.write_data(test_dir, results_test) if (epoch + 1) % 10 == 0: self.write_imgs(test_dir, batch_test) print("Predicting from train set") batch_train = self.train_iter.next() results_train = self.predict_batch(batch_train) train_dir = os.path.join("train_res", str(epoch).zfill(4), "train") self.write_data(train_dir, results_train) gc.collect() if (epoch + 1) % 10 == 0: self.write_imgs(train_dir, batch_train) # --------------------------------------------------------------------- # # batch: (inputs_dict, dummy_targets) # inputs_dict keys: "frames", "labels", "input_len", "label_len" # --------------------------------------------------------------------- # def predict_batch(self, batch): inputs, _ = batch features = inputs["frames"] # np array [B, T, H, W, 1] labels = inputs["labels"] # np array [B, max_L] input_len = inputs["input_len"] # np array [B, 1] label_len = inputs["label_len"] # np array [B, 1] batch_size = features.shape[0] # Run the base lipreading model to get [B, T, V] yhat = self.infer_model.predict(features) # softmax probs # True sequence lengths for CTC decode seq_lengths = input_len.reshape(-1) # [B] decoded_sparse = tf.keras.backend.ctc_decode( yhat, input_length=[true_len], # B=1 greedy=False )[0][0].numpy() decoded = decoded[0].numpy() # [B, max_decoded_len] results = [] for i in range(batch_size): L = label_len[i, 0] # scalar # Original label ids without padding orig_ids = labels[i, :L] # Use TF ops for mapping ids -> chars orig_str = tf.strings.reduce_join( self.num_to_char(orig_ids) ).numpy().decode("utf-8") # Decoded prediction (we don't have prediction length, so just strip zeros if any) pred_ids = decoded[i] # Remove any leading/trailing zeros that might appear pred_ids = pred_ids[pred_ids != 0] pred_str = tf.strings.reduce_join( self.num_to_char(pred_ids) ).numpy().decode("utf-8") print("Original is: ", orig_str) print("Prediction is:", pred_str) print("~" * 100) # For visualization we keep the raw [T, V] matrix results.append([orig_str, pred_str, yhat[i]]) return results def write_data(self, dir, data): os.makedirs(dir, exist_ok=True) for i, x in enumerate(data): fn_txt = os.path.join(dir, f"{i:02d}.txt") with open(fn_txt, "w") as f: f.write(x[0] + "\n") f.write(x[1]) # x[2] is [T, V] probabilities fn_img = os.path.join(dir, f"{i:02d}.png") print(x[2].shape) np_array = x[2] # [T, V], float np_array_uint8 = (np_array * 255).astype(np.uint8) upscaled = np.repeat(np_array_uint8, 4, axis=0) upscaled = np.repeat(upscaled, 4, axis=1) rotated = np.rot90(upscaled) plt.imsave(fn_img, rotated, cmap="gray") def write_imgs(self, dir, batch): inputs, _ = batch features = inputs["frames"] # [B, T, H, W, 1] labels = inputs["labels"] # not used here, but could be batch_size = features.shape[0] os.makedirs(dir, exist_ok=True) for i in range(batch_size): for j in range(features.shape[1]): img = features[i, j, :, :, 0] if not np.all(img == 0): fn_img = os.path.join(dir, f"{i:02d}_{j:04d}.png") plt.imsave(fn_img, img, cmap="gray") #CTCloss funkcija gubitka za merenje kvaliteta mreže def CTCLoss(y_true, y_pred): batch_len = tf.cast(tf.shape(y_true)[0], dtype = "int64") input_length = tf.cast(tf.shape(y_pred)[1], dtype = "int64") label_length = tf.cast(tf.shape(y_true)[1], dtype = "int64") input_length = input_length * tf.ones(shape=(batch_len, 1), dtype = "int64") label_length = label_length * tf.ones(shape=(batch_len, 1), dtype = "int64") #tf.print(f"bl: {batch_len}", output_stream=sys.stdout) #tf.print(f"il: {input_length}", output_stream=sys.stdout) #tf.print(f"ll: {label_length}", output_stream=sys.stdout) #tf.print(f"y_true shape: {tf.shape(y_true)}", output_stream=sys.stdout) #tf.print(f"y_pred shape: {tf.shape(y_pred)}", output_stream=sys.stdout) #tf.print("\ny_true shape:", tf.shape(y_true), output_stream=sys.stdout) #tf.print("y_pred shape:", tf.shape(y_pred), output_stream=sys.stdout) #tf.print("y_true values:", y_true, output_stream=sys.stdout) #tf.print("y_pred values:", y_pred, output_stream=sys.stdout) loss = tf.keras.backend.ctc_batch_cost(y_true, y_pred, input_length, label_length) #tf.print("\nCTC Loss:", loss, summarize=-1, output_stream=sys.stdout) return loss def ctc_lambda_layer(args): """ args = [labels, y_pred, input_len, label_len] labels: [B, max_L] y_pred: [B, T, num_classes] (softmax probs) input_len: [B, 1] label_len: [B, 1] """ labels, y_pred, input_len, label_len = args # numeric safety (ctc_batch_cost uses log) y_pred = tf.clip_by_value(y_pred, K.epsilon(), 1.0 - K.epsilon()) # Ensure proper dtypes (ctc_batch_cost accepts int32 or int64 lengths) input_len = tf.cast(input_len, tf.int32) label_len = tf.cast(label_len, tf.int32) loss = K.ctc_batch_cost(labels, y_pred, input_len, label_len) # [B, 1] return loss def build_ctc_training_model(max_num_frames, img_height, img_width, num_classes, max_num_tokens): # Base lip-reading network lip_model = LipReadingModel() # your existing class # Optionally: build so shapes are known lip_model.build((None, max_num_frames, img_height, img_width, 1)) # Inputs for training frames_in = Input(shape=(max_num_frames, img_height, img_width, 1), name="frames", dtype=tf.float32) labels_in = Input(shape=(max_num_tokens,), name="labels", dtype=tf.int32) input_len_in = Input(shape=(1,), name="input_len", dtype=tf.int64) label_len_in = Input(shape=(1,), name="label_len", dtype=tf.int64) # Forward pass y_pred = lip_model(frames_in) # [B, T, num_classes] # CTC loss as output loss_out = Lambda(ctc_lambda_layer, name="ctc_loss")( [labels_in, y_pred, input_len_in, label_len_in] ) # [B, 1] # Training model: input -> loss train_model = Model( inputs=[frames_in, labels_in, input_len_in, label_len_in], outputs=loss_out, name="lipreading_ctc_train_model" ) return train_model, lip_model #podesi učenje def learning(feature_folder): export_root = "./features/exported" stats_file = "./features/exported/global_stats.json" pad_x = (max_num_frames, IMG_HEIGHT, IMG_WIDTH, 1) pad_y = (max_num_tokens,) # Data pipelines with real lengths train_data = make_stream(export_root, "train", batch_size=8, pad_x=pad_x, pad_y=pad_y, stats_json=stats_file) test_data = make_stream(export_root, "test", batch_size=2, pad_x=pad_x, pad_y=pad_y, stats_json=stats_file, shuffle_files=False) # Build CTC training model + base model # num_classes must match your final Dense output size # (typically len(char_to_num.get_vocabulary())) char_to_num, _ = vocabulary() num_classes = len(char_to_num.get_vocabulary()) train_model, base_model = build_ctc_training_model( max_num_frames=max_num_frames, img_height=IMG_HEIGHT, img_width=IMG_WIDTH, num_classes=num_classes, max_num_tokens=max_num_tokens, ) #lr = 6 * 8e-05 #optimizer = Adam(learning_rate=lr) optimizer = Adam(learning_rate=2e-4, clipnorm=1.0) # Keras will treat 'ctc_loss' output as the loss; we pass a dummy y, so: train_model.compile( optimizer=optimizer, loss={"ctc_loss": lambda y_true, y_pred: tf.reduce_mean(y_pred)}, jit_compile=False, ) train_model.summary() best_model_callback = ModelCheckpoint( filepath='./weights/best_model.weights.h5', monitor='val_loss', save_best_only=True, save_weights_only=True, mode='min', verbose=1 ) latest_model_callback = ModelCheckpoint( filepath='./weights/latest_model.weights.h5', save_weights_only=True, save_freq='epoch', verbose=1 ) schedule_callback = LearningRateScheduler(scheduler) example_callback = ProduceExample( train_data, test_data, infer_model=base_model, num_to_char=num_to_char ) save_history_callback = SaveHistoryCallback( save_path='./history_logs', interval=10 ) num_epochs = 25 history = train_model.fit( train_data, validation_data=test_data, epochs=num_epochs, callbacks=[ best_model_callback, latest_model_callback, schedule_callback, example_callback, save_history_callback, ] ) # Later, for inference, create a new LipReadingModel() and do: # infer_model = LipReadingModel() # infer_model.build((None, max_num_frames, IMG_HEIGHT, IMG_WIDTH, 1)) # infer_model.load_weights('./weights/best_model.weights.h5') # # (weights are shared by name between train_model and base_model) return history