import os import sys import time import torch from torch.utils.data import Dataset from torch.utils.data import DataLoader import torch.nn as nn # This is a good set of 20 test cases, as they're mostly internally consistent and present in most speakers: # 014 should always be included # 002, 012, 013, 016, 018, 020, 032, 038, 043, 049, 054, 056, 062, 064, 067, 081, 083, 084, 094 class VoiceRecognitionDataset(Dataset): test_samples = ["014", "002", "012", "013", "016", "018", "020", "032", "038", "043", "049", "054", "056", "062", "064", "067", "081", "083", "084", "094"] def __init__(self, data_path, is_testdata): self.data_path = data_path self.num_speakers = len(os.listdir(data_path)) print(self.num_speakers) self.speaker_list = [] self.utterance_count = 0 self.utterence_list = [] self.audio_sample_list = [] #search through all audio folders for dir in os.listdir(data_path): self.speaker_list.append(dir) #for each participant folder (e.g. p274) print(dir) for file in os.listdir(data_path + dir): if not file == "log.txt": file_num = file[5:8] mic_num = file[-5:-4] if mic_num == "2": if is_testdata: if file_num in self.test_samples: self.utterance_count += 1 self.utterence_list.append((dir, data_path + dir + "/" + file)) self.audio_sample_list.append(torch.transpose(torch.load(data_path + dir + "/" + file), 1, 2).squeeze()) else: if file_num not in self.test_samples: self.utterance_count += 1 self.utterence_list.append((dir, data_path + dir + "/" + file)) self.audio_sample_list.append(torch.transpose(torch.load(data_path + dir + "/" + file), 1, 2).squeeze()) print("Number of speakers: " + str(len(self.speaker_list))) def __len__(self): return self.utterance_count def __getitem__(self, idx): #speaker, mel_spec = self.utterence_list[idx] speaker, _ = self.utterence_list[idx] #mel_spec = torch.transpose(torch.load(mel_spec), 1, 2).squeeze() mel_spec = self.audio_sample_list[idx] speaker = self.get_speaker_id(speaker) return mel_spec, speaker def get_speaker_id(self, speaker): return self.speaker_list.index(speaker) def collate_fn_padded(batch): ''' Padds batch of variable length note: it converts things ToTensor manually here since the ToTensor transform assume it takes in images rather than arbitrary tensors. ''' ## get labels labels = [y for (x,y) in batch] ## padd batch = [ torch.Tensor(x).to(device) for (x,y) in batch ] batch = torch.nn.utils.rnn.pad_sequence(batch) batch = torch.transpose(batch,0,1) ## compute mask #mask = (batch != 0).to(device) return batch, labels #, lengths, mask def prepare_data(data_path): training_data = VoiceRecognitionDataset(data_path, is_testdata=False) test_data = VoiceRecognitionDataset(data_path, is_testdata=True) train_dataloader = DataLoader(training_data, batch_size=32, shuffle=True, collate_fn=collate_fn_padded) test_dataloader = DataLoader(test_data, batch_size=32, shuffle=True, collate_fn=collate_fn_padded) return train_dataloader, test_dataloader device = ( "cuda" if torch.cuda.is_available() else "mps" if torch.backends.mps.is_available() else "cpu" ) print(f"Using {device} device") # Model's basic structure # # Should have BatchNorm after every layer it seems? # Mel Spectogram -> Convolutional Block -> (B?)LSTM -> Linear Block -> Encoding -> Prediction class VoiceRecognitionModel(nn.Module): def __init__(self, num_speakers): super().__init__() self.num_speakers = num_speakers self.convolution_block = (nn.Sequential( nn.Conv2d(1, 32, 3, padding='same'), nn.BatchNorm2d(32) )) for i in range(0,1): self.convolution_block.append(nn.Sequential( nn.Conv2d(32, 32, 3, padding='same'), nn.BatchNorm2d(32) )) self.convolution_block.append(nn.Sequential( nn.Conv2d(32, 1, 3, padding='same'), nn.BatchNorm2d(1) )) self.blstm = nn.LSTM(input_size=128, hidden_size=128, bidirectional=False, batch_first=True) self.blstm_batchnorm = nn.BatchNorm1d(128) self.linear_block = nn.Sequential( nn.Linear(128, 64), nn.BatchNorm1d(64), nn.ReLU(), nn.Linear(64, 32), nn.BatchNorm1d(32), nn.ReLU() ) self.encoding = nn.Sequential( nn.Linear(32, 16), # what size? nn.BatchNorm1d(16), nn.ReLU() ) self.prediction = nn.Sequential( nn.Linear(16, self.num_speakers), #how many speakers are there? nn.Softmax(dim=1) ) def forward(self, x): x = x.unsqueeze(1) x_hat = self.convolution_block(x) x_hat = x_hat.squeeze() output, (h_n, c_n) = self.blstm(x_hat) x_hat = self.blstm_batchnorm(h_n.squeeze()) x_hat = self.linear_block(x_hat) x_hat = self.encoding(x_hat) pred = self.prediction(x_hat) return pred #probably correct? def train(dataloader, model, loss_fn, optimizer): size = len(dataloader.dataset) model.train() start = time.time() for batch, (X, y) in enumerate(dataloader): y = torch.tensor(y).to(device) X = X.to(device) # Compute prediction error pred = model(X) loss = loss_fn(pred, y) # Backpropagation loss.backward() optimizer.step() optimizer.zero_grad() if batch % 100 == 0: loss, current = loss.item(), (batch + 1) * len(X) print(f"loss: {loss:>7f} [{current:>5d}/{size:>5d}]") end = time.time() print("Epoch Time: " + str((end - start)/60) + " minutes.") return True #probably correct? def test(dataloader, model, loss_fn): size = len(dataloader.dataset) num_batches = len(dataloader) model.eval() test_loss, correct = 0, 0 with torch.no_grad(): for X, y in dataloader: X, y = X.to(device), torch.tensor(y).to(device) pred = model(X) test_loss += loss_fn(pred, y).item() correct += (pred.argmax(1) == y).type(torch.float).sum().item() test_loss /= num_batches correct /= size print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n") if __name__=="__main__": train_dataloader, test_dataloader = prepare_data(sys.argv[1]) #put proper path here model = VoiceRecognitionModel(num_speakers=110) model.to(device) loss_fn = nn.NLLLoss() optimizer = torch.optim.SGD(model.parameters(), lr=1e-3) print("Model:") print(model) epochs = 100 for t in range(epochs): print(f"Epoch {t+1}\n-------------------------------") train(train_dataloader, model, loss_fn, optimizer) test(test_dataloader, model, loss_fn) print("Done!") torch.save(model.state_dict(), "model.pth") print("Saved PyTorch Model State to model.pth")