import pandas as pd import numpy as np import tensorflow_hub as hub from nltk.stem.wordnet import WordNetLemmatizer from nltk.tokenize import RegexpTokenizer from gensim.corpora import Dictionary from sentence_transformers import SentenceTransformer, util from nltk.stem.wordnet import WordNetLemmatizer from nltk.tokenize import RegexpTokenizer from gensim.corpora import Dictionary import matplotlib.pyplot as plt import torch import nltk from collections import defaultdict from operator import itemgetter nltk.download('punkt') nltk.download('averaged_perceptron_tagger') from scipy.spatial.distance import cosine from transformers import AutoModel, AutoTokenizer import seaborn as sns import numpy as np from collections import Counter from wordcloud import WordCloud from sklearn.cluster import AgglomerativeClustering from sklearn.neighbors import kneighbors_graph from sklearn.metrics.pairwise import cosine_similarity from sklearn.metrics import precision_recall_fscore_support from sklearn.metrics import classification_report from sklearn.metrics import confusion_matrix from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.cluster import KMeans import warnings warnings.filterwarnings("ignore") #declarations: module_url = "https://tfhub.dev/google/universal-sentence-encoder/4" USE_model = hub.load(module_url) def cal_cosine_bert_tf(df): print("bert embeddings\n") model_nli_means_tokens= SentenceTransformer('sentence-transformers/all-distilroberta-v1') BERT_open_coss= model_nli_means_tokens.encode(df.requirement) tfidfconvert= TfidfVectorizer(ngram_range=(1,4)).fit(df.requirement) open_cos_tfidf=tfidfconvert.transform(df.requirement).todense() BERT_TFIDF_open_coss=np.hstack((BERT_open_coss, open_cos_tfidf)) open_coss_cos_sim_matrix_cluster_0=cosine_similarity(BERT_TFIDF_open_coss,BERT_TFIDF_open_coss) return open_coss_cos_sim_matrix_cluster_0 def cal_cosine_use(df): print("Universal sentence encoder \n") USE_open_coss=USE_model(df.requirement) USE_open_coss_cos_sim_matrix= cosine_similarity(USE_open_coss, USE_open_coss) return USE_open_coss_cos_sim_matrix def cal_cosine_sim(df): print("TFIDF embeddings \n") tf_idf = TfidfVectorizer(ngram_range=(1,4)).fit(df.requirement) tf_idf_vector = tf_idf.transform(df.requirement).todense() cos_sim = cosine_similarity(tf_idf_vector,tf_idf_vector) return cos_sim def find_closest_match_v2(df, cos_sim_matrix, lookup, cluster): max_similarity=0 cos_sim=[] text=[] label=[] idx=[] for i in range(len(cos_sim_matrix)): if (cos_sim_matrix[lookup,i]<0.99995): #print('i=',i, max_similarity) max_similarity=cos_sim_matrix[lookup,i] cos_sim.append(max_similarity) text.append(df.requirement.iloc[i]) idx.append(df.idx.iloc[i]) #label.append(df.label.iloc[i]) #idx=i #print (df.iloc[lookup].values) generated_df=pd.DataFrame(np.array([idx, cos_sim, text]).T, columns=['idx','Cos_sim', 'text']) generated_df.sort_values(by='Cos_sim', ascending=False, inplace=True) generated_df.reset_index(drop=True, inplace=True) return generated_df def plot_roc_curve(fpr, tpr): plt.subplots(figsize=(8,6)) #sns.set(rc={"grid.linewidth": 0.2}) sns.set_context("paper", font_scale=2.0) plt.plot(fpr, tpr, color='orange', label='ROC') plt.plot([0, 1], [0, 1], color='darkblue', linestyle='--') plt.xlabel('False Positive Rate') plt.ylabel('True Positive Rate') #plt.title('Receiver Operating Characteristic (ROC) Curve') plt.tight_layout() plt.legend() plt.show() def find_optimal_cutoff(df,embeddings): print("Training :", df.shape) f1score_w=[] if embeddings == 'tfidf': cos_matrix = cal_cosine_sim(df) if embeddings == 'bert': cos_matrix = cal_cosine_bert_tf(df) if embeddings == 'use': cos_matrix = cal_cosine_use(df) df['y_hat'] = 0 for j in range(100): temp = [] for i in range(len(df)): inspect=find_closest_match_v2(df, cos_matrix, i,0) if float(inspect.iloc[0].Cos_sim)>j/100: temp.append('Yes') else: temp.append('No') se = pd.Series(temp) df['y_hat'] = se.values f1score_w.append(precision_recall_fscore_support(df.conflict.values, df.y_hat.values, average='macro')) f1score_w = np.array(f1score_w) return np.where(f1score_w[:,1] == f1score_w[:,1].max())[0][0] def find_conflict_detect(train_df,embeddings = 0): df = train_df.copy() df = df.reset_index(drop=True) #print(df.shape) FPR = [] TPR = [] cos_dict = {} df['y_hat'] = 0 #print(df.head(5)) #print("df shape :\n",df.shape) cos_matrix = np.zeros((df.shape[0],df.shape[0])) if embeddings == 1: cos_matrix = cal_cosine_sim(df) elif embeddings == 2: cos_matrix = cal_cosine_bert_tf(df) elif embeddings == 3: cos_matrix = cal_cosine_use(df) else: print("invalid id for embeddings \n") #print('shape of cos_matrix',cos_matrix.shape) #print(type(cos_matrix)) for k in range(100): for i in range(len(df)): #print(i) inspect=find_closest_match_v2(df,cos_matrix, i,0) if float(inspect.iloc[0].Cos_sim)>=k/100: df.loc[i, 'y_hat']='Yes' else: df.loc[i, 'y_hat']='No' #print("df successful \n",df['y_hat'].value_counts()) #print(df) y_true = df.conflict.values y_pred = df.y_hat.values #print(len(y_true),len(y_pred)) data = confusion_matrix(y_true, y_pred) df_cm = pd.DataFrame(data, columns=np.unique(y_true), index = np.unique(y_true)) df_cm.index.name = 'Actual' df_cm.columns.name = 'Predicted' tpr = df_cm.iloc[1,1] / (df_cm.iloc[1,1] + df_cm.iloc[1,0]) fpr = df_cm.iloc[0,1] / (df_cm.iloc[0,1] + df_cm.iloc[0,0]) TPR.append(tpr) FPR.append(fpr) cos_dict[k] = list([tpr,fpr]) plot_roc_curve(FPR,TPR) return cos_dict def find_optimal_cutoff_withroc(cos_dict): fpr = [] tpr = [] threshold = [] for key,val in cos_dict.items(): threshold.append(key/100) tpr.append(val[0]) fpr.append(val[1]) i = np.arange(len(tpr)) tf = np.array(tpr) - np.array(1 - np.array(fpr)) roc = pd.DataFrame({'tf' : pd.Series(tf, index=i), 'threshold' : pd.Series(threshold, index=i)}) roc_t = roc.iloc[(roc.tf-0).abs().argsort()[:1]] return float(roc_t['threshold']) def test_cutoff(test_df,cutoff,embeddings = 0): temp = [] df = test_df.copy() df = df.reset_index(drop=True) cos_matrix = np.zeros((df.shape[0],df.shape[0])) if embeddings == 1: cos_matrix = cal_cosine_sim(df) elif embeddings == 2: cos_matrix = cal_cosine_bert_tf(df) elif embeddings == 3: cos_matrix = cal_cosine_use(df) else: print("invalid id for embeddings \n") for i in range(len(df)): inspect=find_closest_match_v2(df,cos_matrix , i,0) if float(inspect.iloc[0].Cos_sim) >= cutoff: temp.append('Yes') else: temp.append('No') df['y_hat'] = temp print(classification_report(df.conflict.values, df.y_hat.values, digits = 6)) print("********** Confusion Matrix for this fold *************\n") print(confusion_matrix(df.conflict.values, df.y_hat.values)) cf = confusion_matrix(df.conflict.values, df.y_hat.values) df_cm = pd.DataFrame(cf, columns=np.unique(df.conflict.values), index = np.unique(df.conflict.values)) df_cm.index.name = 'Actual' df_cm.columns.name = 'Predicted' tpr = df_cm.iloc[1,1] / (df_cm.iloc[1,1] + df_cm.iloc[1,0]) print("The tpr for this fold is :", tpr) candidate_set = df.loc[df['y_hat'] == 'Yes'] return df, candidate_set def count_unique_nouns(requirement): noun_counter=[] tagged_requirement= pos_tags_req(requirement) for word, tag in tagged_requirement: if 'V' not in tag: # Skip verbs - to be verified with nltk documentation noun_counter.append(word) # Consider lemmatization for better generalization return (set(noun_counter)) def count_similar_nouns(requirement, conflict_candidate): similar_nouns_counter=[] tagged_requirement= pos_tags_req(requirement) tagged_conflict_candidate=pos_tags_req(conflict_candidate) for word, tag in tagged_requirement: if 'V' not in tag: # Skip verbs - to be verified with nltk PoS documentation for conflict_word, conflict_tag in tagged_conflict_candidate: if word==conflict_word and 'V' not in conflict_tag: similar_nouns_counter.append((word,conflict_word)) return len(similar_nouns_counter) def get_all_noun_count(con_sim,i): total_noun_count_list = [] total_noun_count_list.append(count_unique_nouns(str(con_sim['similar_1'].iloc[i]))) total_noun_count_list.append(count_unique_nouns(str(con_sim['similar_2'].iloc[i]))) total_noun_count_list.append(count_unique_nouns(str(con_sim['similar_3'].iloc[i]))) total_noun_count_list.append(count_unique_nouns(str(con_sim['similar_4'].iloc[i]))) total_noun_count_list.append(count_unique_nouns(str(con_sim['similar_5'].iloc[i]))) total_noun_count_list.append(count_unique_nouns(str(con_sim['similar_6'].iloc[i]))) return total_noun_count_list def get_all_similar_noun_count(con_sim,i): total_noun_count_list = 0 total_noun_count_list = total_noun_count_list + count_similar_nouns(str(con_sim['similar_1'].iloc[i]),str(con_sim['similar_1'].iloc[i])) total_noun_count_list = total_noun_count_list + count_similar_nouns(str(con_sim['similar_2'].iloc[i]),str(con_sim['similar_1'].iloc[i])) total_noun_count_list = total_noun_count_list + count_similar_nouns(str(con_sim['similar_3'].iloc[i]),str(con_sim['similar_1'].iloc[i])) total_noun_count_list = total_noun_count_list + count_similar_nouns(str(con_sim['similar_4'].iloc[i]),str(con_sim['similar_1'].iloc[i])) total_noun_count_list = total_noun_count_list + count_similar_nouns(str(con_sim['similar_5'].iloc[i]),str(con_sim['similar_1'].iloc[i])) total_noun_count_list = total_noun_count_list + count_similar_nouns(str(con_sim['similar_6'].iloc[i]),str(con_sim['similar_1'].iloc[i])) return total_noun_count_list def get_similar_noun_count(con_sim): similar_noun_column = [] for i in range(con_sim.shape[0]): similar_noun_column.append(get_all_similar_noun_count(con_sim,i)) return similar_noun_column def get_unique_noun_count(con_sim): unique_noun_column = [] for i in range(con_sim.shape[0]): total_noun_count_list = get_all_noun_count(con_sim,i) total_unique_noun_set = [] for i in total_noun_count_list: for j in i : total_unique_noun_set.append(j) l = len(set(total_unique_noun_set)) unique_noun_column.append(l) return unique_noun_column def get_ratio(df): con_sim = df.copy() con_sim = con_sim.reset_index(drop=True) con_sim['similar_noun'] = get_similar_noun_count(con_sim) con_sim['unique_noun'] = get_unique_noun_count(con_sim) con_sim['noun_ratio'] = con_sim['similar_noun']/con_sim['unique_noun'] con_sim['second_label'] = 0 for i in range(len(con_sim)): if float(con_sim.iloc[i].noun_ratio) >= 1: con_sim.loc[i, 'second_label']='Yes' else: con_sim.loc[i,'second_label'] = 'No' return con_sim def get_dict_withid_req(df): i_dict = {} for i in range(df.shape[0]): i_dict[df.iloc[i].idx] = df.iloc[i].requirement return i_dict def cosine_sim(text1, text2): vectorizer = TfidfVectorizer(stop_words='english',ngram_range=(1,4)) tfidf = vectorizer.fit_transform([text1, text2]) return ((tfidf * tfidf.T).A)[0,1] def get_cosine_all(con_dict,all_req): final_dict = defaultdict(list) for key1,val1 in con_dict.items(): temp_list = [] for key2,val2 in all_req.items(): temp_list.append((key2,cosine_sim(con_dict[key1],all_req[key2]))) final_dict[key1].append(temp_list) return final_dict def get_req_sorted(final_dict): sorted_dict = {} best_dict = {} for i,key in enumerate(final_dict): sorted_dict[key] = sorted(final_dict[key][0],key=itemgetter(1),reverse=True) for key,val in sorted_dict.items(): sorted_dict[key] = val[:6] for i,key in enumerate(sorted_dict): ids = [] for j in range(len(sorted_dict[key])): ids.append(sorted_dict[key][j][0]) best_dict[key] = ids return best_dict def pos_tags_req(text): tokens = nltk.word_tokenize(text) tagged = nltk.pos_tag(tokens) tags= [(word, tag) for word,tag in tagged if word not in["[","]"]] # Additional Filter to clean square brackets from PoS tags tags = [(word, tag) for word, tag in tags if (tag=='NNP' or tag == 'NNS' or tag == 'NNP' or tag == 'NNPS' or tag == 'VBD' or tag == 'VBN' or tag == 'VBP' or tag == 'VBZ' or tag == 'NN')] return list(tags) def pos_tags_req_spacy(text): nlp = spacy.load("en_core_web_sm") doc = nlp(text) for token in doc: print(token,token.pos_) def create_dataframe(all_req_df,candidate, best_dict): columns = ['conflict_candidate','similar_1','similar_2','similar_3','similar_4','similar_5','similar_6'] index = list(range(0,candidate.shape[0],1)) con_sim = pd.DataFrame(columns=columns,index=index) con_sim['conflict_candidate'] = best_dict.keys() tags_column_1 = [] tags_column_2 = [] tags_column_3 = [] tags_column_4 = [] tags_column_5 = [] tags_column_6 = [] for k,v in best_dict.items(): tags_column_1.append(all_req_df['requirement'][all_req_df['idx'] == v[0]].values) tags_column_2.append(all_req_df['requirement'][all_req_df['idx'] == v[1]].values) tags_column_3.append(all_req_df['requirement'][all_req_df['idx'] == v[2]].values) tags_column_4.append(all_req_df['requirement'][all_req_df['idx'] == v[3]].values) tags_column_5.append(all_req_df['requirement'][all_req_df['idx'] == v[4]].values) tags_column_6.append(all_req_df['requirement'][all_req_df['idx'] == v[5]].values) con_sim['similar_1'] = tags_column_1 con_sim['similar_2'] = tags_column_2 con_sim['similar_3'] = tags_column_3 con_sim['similar_4'] = tags_column_4 con_sim['similar_5'] = tags_column_5 con_sim['similar_6'] = tags_column_6 con_sim['true_label'] = candidate[['conflict']][candidate['y_hat'] == 'Yes'].values con_sim['predicted_label'] = candidate[['y_hat']][candidate['y_hat'] == 'Yes'].values return con_sim def final_conflict(req_df, candidate_set,test_df): all_req_df = req_df.copy() candidate = candidate_set.copy() test = test_df.copy() con_re_dict = get_dict_withid_req(candidate) all_req_dict = get_dict_withid_req(req_df) con_original_cosine_all = get_cosine_all(con_re_dict,all_req_dict) con_best_dict = get_req_sorted(con_original_cosine_all) con_sim = create_dataframe(req_df,candidate,con_best_dict) final_df = get_ratio(con_sim) ids = final_df['conflict_candidate'].values for i in range(len(test_df)): l = test_df.iloc[i].idx if l in ids: k = final_df.index[final_df['conflict_candidate'] == l].to_list() label = final_df.iloc[k[0]].second_label test_df.loc[i,'y_hat'] = label print(classification_report(test_df.conflict.values, test_df.y_hat.values, digits = 6)) print("********** Confusion Matrix for this fold *************\n") print(confusion_matrix(test_df.conflict.values, test_df.y_hat.values))