from tensorflow.keras.backend import ctc_batch_cost
#from tensorflow.keras.optimizers import Adam
from tensorflow.keras.optimizers.legacy import Adam
from tensorflow.keras.callbacks import ModelCheckpoint, LearningRateScheduler
from preprocessing import *
from model import LipReadingModel
from datapipeline import *
import sys
import numpy as np
import matplotlib.pyplot as plt
import gc
from tensorflow.keras import backend as K
from tensorflow.keras.layers import Lambda, Input
from tensorflow.keras.models import Model
#strategy = tf.distribute.MirroredStrategy()
char_to_num, num_to_char = vocabulary()
#config = tf.compat.v1.ConfigProto()
#config.intra_op_parallelism_threads = 4
#config.inter_op_parallelism_threads = 2
#session = tf.compat.v1.Session(config=config)
#tf.compat.v1.keras.backend.set_session(session)
#physical_devices = tf.config.list_physical_devices('GPU')
#tf.config.experimental.set_memory_growth(physical_devices[0], True)
#tf.compat.v1.enable_eager_execution()
gpus = tf.config.list_physical_devices('GPU')
if gpus:
try:
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
except RuntimeError as e:
print(e)
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '0' # or set to '1'
import logging
logging.getLogger('tensorflow').setLevel(logging.DEBUG)
max_num_frames = 150 #5 seconds
IMG_WIDTH = 100
IMG_HEIGHT = 50
#podesiva brzina ucenja
'''
def scheduler(epoch, lr):
# if epoch < 50:
# return lr
# else:
# cycle = (epoch - 50) // 50
# if cycle % 2 == 0:
# return float(lr * tf.math.exp(-0.1))
# else:
# return lr
remainder = epoch % 15
if remainder == 0:
return lr*0.95
else:
return lr
'''
def scheduler(epoch, lr):
# ---- Warmup: epochs 0,1,2 ----
warmup_epochs = 3
target_lr = 2e-4
warmup_start = 1e-5
if epoch < warmup_epochs:
# linear warmup from 1e-5 -> 2e-4 over 3 epochs
return float(
warmup_start + (target_lr - warmup_start) * (epoch / max(1, warmup_epochs - 1))
)
# ---- After warmup: step decay every 8 epochs ----
decay_every = 8
decay_rate = 0.9
# how many decays after warmup
decays = (epoch - warmup_epochs) // decay_every
new_lr = target_lr * (decay_rate ** decays)
return float(new_lr)
#sacuvaj history u backup na svakih 5 epoha
class SaveHistoryCallback(tf.keras.callbacks.Callback):
def __init__(self, save_path, interval=10):
super(SaveHistoryCallback, self).__init__()
self.save_path = save_path
self.interval = interval
self.history = {}
def on_epoch_end(self, epoch, logs=None):
if logs is not None:
for key, value in logs.items():
if key not in self.history:
self.history[key] = []
self.history[key].append(value)
if (epoch + 1) % 5 == 0:
self.save_history()
def save_history(self):
# Save the history as a file
save_file = os.path.join(self.save_path, f'history_epoch_{len(self.history["loss"])}.txt')
with open(save_file, 'w') as f:
for key, values in self.history.items():
f.write(f'{key}: {values}\n')
print(f'Saved history at epoch {len(self.history["loss"])}')
#predikcija mreze nakon svake epohe
class ProduceExample(tf.keras.callbacks.Callback):
def __init__(self, train_ds, test_ds, infer_model, num_to_char) -> None:
super().__init__()
# We keep numpy iterators for both datasets
self.train_iter = train_ds.repeat().as_numpy_iterator()
self.test_iter = test_ds.repeat().as_numpy_iterator()
self.infer_model = infer_model # LipReadingModel, not the CTC wrapper
self.num_to_char = num_to_char # StringLookup invert layer
def on_epoch_end(self, epoch, logs=None) -> None:
print("\nPredicting from test set")
batch_test = self.test_iter.next()
results_test = self.predict_batch(batch_test)
test_dir = os.path.join("train_res", str(epoch).zfill(4), "test")
self.write_data(test_dir, results_test)
if (epoch + 1) % 10 == 0:
self.write_imgs(test_dir, batch_test)
print("Predicting from train set")
batch_train = self.train_iter.next()
results_train = self.predict_batch(batch_train)
train_dir = os.path.join("train_res", str(epoch).zfill(4), "train")
self.write_data(train_dir, results_train)
gc.collect()
if (epoch + 1) % 10 == 0:
self.write_imgs(train_dir, batch_train)
# --------------------------------------------------------------------- #
# batch: (inputs_dict, dummy_targets)
# inputs_dict keys: "frames", "labels", "input_len", "label_len"
# --------------------------------------------------------------------- #
def predict_batch(self, batch):
inputs, _ = batch
features = inputs["frames"] # np array [B, T, H, W, 1]
labels = inputs["labels"] # np array [B, max_L]
input_len = inputs["input_len"] # np array [B, 1]
label_len = inputs["label_len"] # np array [B, 1]
batch_size = features.shape[0]
# Run the base lipreading model to get [B, T, V]
yhat = self.infer_model.predict(features) # softmax probs
# True sequence lengths for CTC decode
seq_lengths = input_len.reshape(-1) # [B]
decoded_sparse = tf.keras.backend.ctc_decode(
yhat,
input_length=[true_len], # B=1
greedy=False
)[0][0].numpy()
decoded = decoded[0].numpy() # [B, max_decoded_len]
results = []
for i in range(batch_size):
L = label_len[i, 0] # scalar
# Original label ids without padding
orig_ids = labels[i, :L]
# Use TF ops for mapping ids -> chars
orig_str = tf.strings.reduce_join(
self.num_to_char(orig_ids)
).numpy().decode("utf-8")
# Decoded prediction (we don't have prediction length, so just strip zeros if any)
pred_ids = decoded[i]
# Remove any leading/trailing zeros that might appear
pred_ids = pred_ids[pred_ids != 0]
pred_str = tf.strings.reduce_join(
self.num_to_char(pred_ids)
).numpy().decode("utf-8")
print("Original is: ", orig_str)
print("Prediction is:", pred_str)
print("~" * 100)
# For visualization we keep the raw [T, V] matrix
results.append([orig_str, pred_str, yhat[i]])
return results
def write_data(self, dir, data):
os.makedirs(dir, exist_ok=True)
for i, x in enumerate(data):
fn_txt = os.path.join(dir, f"{i:02d}.txt")
with open(fn_txt, "w") as f:
f.write(x[0] + "\n")
f.write(x[1])
# x[2] is [T, V] probabilities
fn_img = os.path.join(dir, f"{i:02d}.png")
print(x[2].shape)
np_array = x[2] # [T, V], float
np_array_uint8 = (np_array * 255).astype(np.uint8)
upscaled = np.repeat(np_array_uint8, 4, axis=0)
upscaled = np.repeat(upscaled, 4, axis=1)
rotated = np.rot90(upscaled)
plt.imsave(fn_img, rotated, cmap="gray")
def write_imgs(self, dir, batch):
inputs, _ = batch
features = inputs["frames"] # [B, T, H, W, 1]
labels = inputs["labels"] # not used here, but could be
batch_size = features.shape[0]
os.makedirs(dir, exist_ok=True)
for i in range(batch_size):
for j in range(features.shape[1]):
img = features[i, j, :, :, 0]
if not np.all(img == 0):
fn_img = os.path.join(dir, f"{i:02d}_{j:04d}.png")
plt.imsave(fn_img, img, cmap="gray")
#CTCloss funkcija gubitka za merenje kvaliteta mreže
def CTCLoss(y_true, y_pred):
batch_len = tf.cast(tf.shape(y_true)[0], dtype = "int64")
input_length = tf.cast(tf.shape(y_pred)[1], dtype = "int64")
label_length = tf.cast(tf.shape(y_true)[1], dtype = "int64")
input_length = input_length * tf.ones(shape=(batch_len, 1), dtype = "int64")
label_length = label_length * tf.ones(shape=(batch_len, 1), dtype = "int64")
#tf.print(f"bl: {batch_len}", output_stream=sys.stdout)
#tf.print(f"il: {input_length}", output_stream=sys.stdout)
#tf.print(f"ll: {label_length}", output_stream=sys.stdout)
#tf.print(f"y_true shape: {tf.shape(y_true)}", output_stream=sys.stdout)
#tf.print(f"y_pred shape: {tf.shape(y_pred)}", output_stream=sys.stdout)
#tf.print("\ny_true shape:", tf.shape(y_true), output_stream=sys.stdout)
#tf.print("y_pred shape:", tf.shape(y_pred), output_stream=sys.stdout)
#tf.print("y_true values:", y_true, output_stream=sys.stdout)
#tf.print("y_pred values:", y_pred, output_stream=sys.stdout)
loss = tf.keras.backend.ctc_batch_cost(y_true, y_pred, input_length, label_length)
#tf.print("\nCTC Loss:", loss, summarize=-1, output_stream=sys.stdout)
return loss
def ctc_lambda_layer(args):
"""
args = [labels, y_pred, input_len, label_len]
labels: [B, max_L]
y_pred: [B, T, num_classes] (softmax probs)
input_len: [B, 1]
label_len: [B, 1]
"""
labels, y_pred, input_len, label_len = args
# numeric safety (ctc_batch_cost uses log)
y_pred = tf.clip_by_value(y_pred, K.epsilon(), 1.0 - K.epsilon())
# Ensure proper dtypes (ctc_batch_cost accepts int32 or int64 lengths)
input_len = tf.cast(input_len, tf.int32)
label_len = tf.cast(label_len, tf.int32)
loss = K.ctc_batch_cost(labels, y_pred, input_len, label_len) # [B, 1]
return loss
def build_ctc_training_model(max_num_frames, img_height, img_width, num_classes, max_num_tokens):
# Base lip-reading network
lip_model = LipReadingModel() # your existing class
# Optionally: build so shapes are known
lip_model.build((None, max_num_frames, img_height, img_width, 1))
# Inputs for training
frames_in = Input(shape=(max_num_frames, img_height, img_width, 1),
name="frames", dtype=tf.float32)
labels_in = Input(shape=(max_num_tokens,),
name="labels", dtype=tf.int32)
input_len_in = Input(shape=(1,),
name="input_len", dtype=tf.int64)
label_len_in = Input(shape=(1,),
name="label_len", dtype=tf.int64)
# Forward pass
y_pred = lip_model(frames_in) # [B, T, num_classes]
# CTC loss as output
loss_out = Lambda(ctc_lambda_layer, name="ctc_loss")(
[labels_in, y_pred, input_len_in, label_len_in]
) # [B, 1]
# Training model: input -> loss
train_model = Model(
inputs=[frames_in, labels_in, input_len_in, label_len_in],
outputs=loss_out,
name="lipreading_ctc_train_model"
)
return train_model, lip_model
#podesi učenje
def learning(feature_folder):
export_root = "./features/exported"
stats_file = "./features/exported/global_stats.json"
pad_x = (max_num_frames, IMG_HEIGHT, IMG_WIDTH, 1)
pad_y = (max_num_tokens,)
# Data pipelines with real lengths
train_data = make_stream(export_root, "train",
batch_size=8,
pad_x=pad_x,
pad_y=pad_y,
stats_json=stats_file)
test_data = make_stream(export_root, "test",
batch_size=2,
pad_x=pad_x,
pad_y=pad_y,
stats_json=stats_file,
shuffle_files=False)
# Build CTC training model + base model
# num_classes must match your final Dense output size
# (typically len(char_to_num.get_vocabulary()))
char_to_num, _ = vocabulary()
num_classes = len(char_to_num.get_vocabulary())
train_model, base_model = build_ctc_training_model(
max_num_frames=max_num_frames,
img_height=IMG_HEIGHT,
img_width=IMG_WIDTH,
num_classes=num_classes,
max_num_tokens=max_num_tokens,
)
#lr = 6 * 8e-05
#optimizer = Adam(learning_rate=lr)
optimizer = Adam(learning_rate=2e-4, clipnorm=1.0)
# Keras will treat 'ctc_loss' output as the loss; we pass a dummy y, so:
train_model.compile(
optimizer=optimizer,
loss={"ctc_loss": lambda y_true, y_pred: tf.reduce_mean(y_pred)},
jit_compile=False,
)
train_model.summary()
best_model_callback = ModelCheckpoint(
filepath='./weights/best_model.weights.h5',
monitor='val_loss',
save_best_only=True,
save_weights_only=True,
mode='min',
verbose=1
)
latest_model_callback = ModelCheckpoint(
filepath='./weights/latest_model.weights.h5',
save_weights_only=True,
save_freq='epoch',
verbose=1
)
schedule_callback = LearningRateScheduler(scheduler)
example_callback = ProduceExample(
train_data,
test_data,
infer_model=base_model,
num_to_char=num_to_char
)
save_history_callback = SaveHistoryCallback(
save_path='./history_logs',
interval=10
)
num_epochs = 25
history = train_model.fit(
train_data,
validation_data=test_data,
epochs=num_epochs,
callbacks=[
best_model_callback,
latest_model_callback,
schedule_callback,
example_callback,
save_history_callback,
]
)
# Later, for inference, create a new LipReadingModel() and do:
# infer_model = LipReadingModel()
# infer_model.build((None, max_num_frames, IMG_HEIGHT, IMG_WIDTH, 1))
# infer_model.load_weights('./weights/best_model.weights.h5')
#
# (weights are shared by name between train_model and base_model)
return history