import pandas as pd
import numpy as np
import tensorflow_hub as hub
from nltk.stem.wordnet import WordNetLemmatizer
from nltk.tokenize import RegexpTokenizer
from gensim.corpora import Dictionary
from sentence_transformers import SentenceTransformer, util
from nltk.stem.wordnet import WordNetLemmatizer
from nltk.tokenize import RegexpTokenizer
from gensim.corpora import Dictionary
import matplotlib.pyplot as plt
import torch
import nltk
from collections import defaultdict
from operator import itemgetter
nltk.download('punkt')
nltk.download('averaged_perceptron_tagger')
from scipy.spatial.distance import cosine
from transformers import AutoModel, AutoTokenizer
import seaborn as sns
import numpy as np
from collections import Counter
from wordcloud import WordCloud
from sklearn.cluster import AgglomerativeClustering
from sklearn.neighbors import kneighbors_graph
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.metrics import precision_recall_fscore_support
from sklearn.metrics import classification_report
from sklearn.metrics import confusion_matrix
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.cluster import KMeans
import warnings
warnings.filterwarnings("ignore")
#declarations:
module_url = "https://tfhub.dev/google/universal-sentence-encoder/4"
USE_model = hub.load(module_url)
def cal_cosine_bert_tf(df):
print("bert embeddings\n")
model_nli_means_tokens= SentenceTransformer('sentence-transformers/all-distilroberta-v1')
BERT_open_coss= model_nli_means_tokens.encode(df.requirement)
tfidfconvert= TfidfVectorizer(ngram_range=(1,4)).fit(df.requirement)
open_cos_tfidf=tfidfconvert.transform(df.requirement).todense()
BERT_TFIDF_open_coss=np.hstack((BERT_open_coss, open_cos_tfidf))
open_coss_cos_sim_matrix_cluster_0=cosine_similarity(BERT_TFIDF_open_coss,BERT_TFIDF_open_coss)
return open_coss_cos_sim_matrix_cluster_0
def cal_cosine_use(df):
print("Universal sentence encoder \n")
USE_open_coss=USE_model(df.requirement)
USE_open_coss_cos_sim_matrix= cosine_similarity(USE_open_coss, USE_open_coss)
return USE_open_coss_cos_sim_matrix
def cal_cosine_sim(df):
print("TFIDF embeddings \n")
tf_idf = TfidfVectorizer(ngram_range=(1,4)).fit(df.requirement)
tf_idf_vector = tf_idf.transform(df.requirement).todense()
cos_sim = cosine_similarity(tf_idf_vector,tf_idf_vector)
return cos_sim
def find_closest_match_v2(df, cos_sim_matrix, lookup, cluster):
max_similarity=0
cos_sim=[]
text=[]
label=[]
idx=[]
for i in range(len(cos_sim_matrix)):
if (cos_sim_matrix[lookup,i]<0.99995):
#print('i=',i, max_similarity)
max_similarity=cos_sim_matrix[lookup,i]
cos_sim.append(max_similarity)
text.append(df.requirement.iloc[i])
idx.append(df.idx.iloc[i])
#label.append(df.label.iloc[i])
#idx=i
#print (df.iloc[lookup].values)
generated_df=pd.DataFrame(np.array([idx, cos_sim, text]).T, columns=['idx','Cos_sim', 'text'])
generated_df.sort_values(by='Cos_sim', ascending=False, inplace=True)
generated_df.reset_index(drop=True, inplace=True)
return generated_df
def plot_roc_curve(fpr, tpr):
plt.subplots(figsize=(8,6))
#sns.set(rc={"grid.linewidth": 0.2})
sns.set_context("paper", font_scale=2.0)
plt.plot(fpr, tpr, color='orange', label='ROC')
plt.plot([0, 1], [0, 1], color='darkblue', linestyle='--')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
#plt.title('Receiver Operating Characteristic (ROC) Curve')
plt.tight_layout()
plt.legend()
plt.show()
def find_optimal_cutoff(df,embeddings):
print("Training :", df.shape)
f1score_w=[]
if embeddings == 'tfidf':
cos_matrix = cal_cosine_sim(df)
if embeddings == 'bert':
cos_matrix = cal_cosine_bert_tf(df)
if embeddings == 'use':
cos_matrix = cal_cosine_use(df)
df['y_hat'] = 0
for j in range(100):
temp = []
for i in range(len(df)):
inspect=find_closest_match_v2(df, cos_matrix, i,0)
if float(inspect.iloc[0].Cos_sim)>j/100:
temp.append('Yes')
else:
temp.append('No')
se = pd.Series(temp)
df['y_hat'] = se.values
f1score_w.append(precision_recall_fscore_support(df.conflict.values, df.y_hat.values, average='macro'))
f1score_w = np.array(f1score_w)
return np.where(f1score_w[:,1] == f1score_w[:,1].max())[0][0]
def find_conflict_detect(train_df,embeddings = 0):
df = train_df.copy()
df = df.reset_index(drop=True)
#print(df.shape)
FPR = []
TPR = []
cos_dict = {}
df['y_hat'] = 0
#print(df.head(5))
#print("df shape :\n",df.shape)
cos_matrix = np.zeros((df.shape[0],df.shape[0]))
if embeddings == 1:
cos_matrix = cal_cosine_sim(df)
elif embeddings == 2:
cos_matrix = cal_cosine_bert_tf(df)
elif embeddings == 3:
cos_matrix = cal_cosine_use(df)
else:
print("invalid id for embeddings \n")
#print('shape of cos_matrix',cos_matrix.shape)
#print(type(cos_matrix))
for k in range(100):
for i in range(len(df)):
#print(i)
inspect=find_closest_match_v2(df,cos_matrix, i,0)
if float(inspect.iloc[0].Cos_sim)>=k/100:
df.loc[i, 'y_hat']='Yes'
else:
df.loc[i, 'y_hat']='No'
#print("df successful \n",df['y_hat'].value_counts())
#print(df)
y_true = df.conflict.values
y_pred = df.y_hat.values
#print(len(y_true),len(y_pred))
data = confusion_matrix(y_true, y_pred)
df_cm = pd.DataFrame(data, columns=np.unique(y_true), index = np.unique(y_true))
df_cm.index.name = 'Actual'
df_cm.columns.name = 'Predicted'
tpr = df_cm.iloc[1,1] / (df_cm.iloc[1,1] + df_cm.iloc[1,0])
fpr = df_cm.iloc[0,1] / (df_cm.iloc[0,1] + df_cm.iloc[0,0])
TPR.append(tpr)
FPR.append(fpr)
cos_dict[k] = list([tpr,fpr])
plot_roc_curve(FPR,TPR)
return cos_dict
def find_optimal_cutoff_withroc(cos_dict):
fpr = []
tpr = []
threshold = []
for key,val in cos_dict.items():
threshold.append(key/100)
tpr.append(val[0])
fpr.append(val[1])
i = np.arange(len(tpr))
tf = np.array(tpr) - np.array(1 - np.array(fpr))
roc = pd.DataFrame({'tf' : pd.Series(tf, index=i), 'threshold' : pd.Series(threshold, index=i)})
roc_t = roc.iloc[(roc.tf-0).abs().argsort()[:1]]
return float(roc_t['threshold'])
def test_cutoff(test_df,cutoff,embeddings = 0):
temp = []
df = test_df.copy()
df = df.reset_index(drop=True)
cos_matrix = np.zeros((df.shape[0],df.shape[0]))
if embeddings == 1:
cos_matrix = cal_cosine_sim(df)
elif embeddings == 2:
cos_matrix = cal_cosine_bert_tf(df)
elif embeddings == 3:
cos_matrix = cal_cosine_use(df)
else:
print("invalid id for embeddings \n")
for i in range(len(df)):
inspect=find_closest_match_v2(df,cos_matrix , i,0)
if float(inspect.iloc[0].Cos_sim) >= cutoff:
temp.append('Yes')
else:
temp.append('No')
df['y_hat'] = temp
print(classification_report(df.conflict.values, df.y_hat.values, digits = 6))
print("********** Confusion Matrix for this fold *************\n")
print(confusion_matrix(df.conflict.values, df.y_hat.values))
cf = confusion_matrix(df.conflict.values, df.y_hat.values)
df_cm = pd.DataFrame(cf, columns=np.unique(df.conflict.values), index = np.unique(df.conflict.values))
df_cm.index.name = 'Actual'
df_cm.columns.name = 'Predicted'
tpr = df_cm.iloc[1,1] / (df_cm.iloc[1,1] + df_cm.iloc[1,0])
print("The tpr for this fold is :", tpr)
candidate_set = df.loc[df['y_hat'] == 'Yes']
return df, candidate_set
def count_unique_nouns(requirement):
noun_counter=[]
tagged_requirement= pos_tags_req(requirement)
for word, tag in tagged_requirement:
if 'V' not in tag: # Skip verbs - to be verified with nltk documentation
noun_counter.append(word) # Consider lemmatization for better generalization
return (set(noun_counter))
def count_similar_nouns(requirement, conflict_candidate):
similar_nouns_counter=[]
tagged_requirement= pos_tags_req(requirement)
tagged_conflict_candidate=pos_tags_req(conflict_candidate)
for word, tag in tagged_requirement:
if 'V' not in tag: # Skip verbs - to be verified with nltk PoS documentation
for conflict_word, conflict_tag in tagged_conflict_candidate:
if word==conflict_word and 'V' not in conflict_tag:
similar_nouns_counter.append((word,conflict_word))
return len(similar_nouns_counter)
def get_all_noun_count(con_sim,i):
total_noun_count_list = []
total_noun_count_list.append(count_unique_nouns(str(con_sim['similar_1'].iloc[i])))
total_noun_count_list.append(count_unique_nouns(str(con_sim['similar_2'].iloc[i])))
total_noun_count_list.append(count_unique_nouns(str(con_sim['similar_3'].iloc[i])))
total_noun_count_list.append(count_unique_nouns(str(con_sim['similar_4'].iloc[i])))
total_noun_count_list.append(count_unique_nouns(str(con_sim['similar_5'].iloc[i])))
total_noun_count_list.append(count_unique_nouns(str(con_sim['similar_6'].iloc[i])))
return total_noun_count_list
def get_all_similar_noun_count(con_sim,i):
total_noun_count_list = 0
total_noun_count_list = total_noun_count_list + count_similar_nouns(str(con_sim['similar_1'].iloc[i]),str(con_sim['similar_1'].iloc[i]))
total_noun_count_list = total_noun_count_list + count_similar_nouns(str(con_sim['similar_2'].iloc[i]),str(con_sim['similar_1'].iloc[i]))
total_noun_count_list = total_noun_count_list + count_similar_nouns(str(con_sim['similar_3'].iloc[i]),str(con_sim['similar_1'].iloc[i]))
total_noun_count_list = total_noun_count_list + count_similar_nouns(str(con_sim['similar_4'].iloc[i]),str(con_sim['similar_1'].iloc[i]))
total_noun_count_list = total_noun_count_list + count_similar_nouns(str(con_sim['similar_5'].iloc[i]),str(con_sim['similar_1'].iloc[i]))
total_noun_count_list = total_noun_count_list + count_similar_nouns(str(con_sim['similar_6'].iloc[i]),str(con_sim['similar_1'].iloc[i]))
return total_noun_count_list
def get_similar_noun_count(con_sim):
similar_noun_column = []
for i in range(con_sim.shape[0]):
similar_noun_column.append(get_all_similar_noun_count(con_sim,i))
return similar_noun_column
def get_unique_noun_count(con_sim):
unique_noun_column = []
for i in range(con_sim.shape[0]):
total_noun_count_list = get_all_noun_count(con_sim,i)
total_unique_noun_set = []
for i in total_noun_count_list:
for j in i :
total_unique_noun_set.append(j)
l = len(set(total_unique_noun_set))
unique_noun_column.append(l)
return unique_noun_column
def get_ratio(df):
con_sim = df.copy()
con_sim = con_sim.reset_index(drop=True)
con_sim['similar_noun'] = get_similar_noun_count(con_sim)
con_sim['unique_noun'] = get_unique_noun_count(con_sim)
con_sim['noun_ratio'] = con_sim['similar_noun']/con_sim['unique_noun']
con_sim['second_label'] = 0
for i in range(len(con_sim)):
if float(con_sim.iloc[i].noun_ratio) >= 1:
con_sim.loc[i, 'second_label']='Yes'
else:
con_sim.loc[i,'second_label'] = 'No'
return con_sim
def get_dict_withid_req(df):
i_dict = {}
for i in range(df.shape[0]):
i_dict[df.iloc[i].idx] = df.iloc[i].requirement
return i_dict
def cosine_sim(text1, text2):
vectorizer = TfidfVectorizer(stop_words='english',ngram_range=(1,4))
tfidf = vectorizer.fit_transform([text1, text2])
return ((tfidf * tfidf.T).A)[0,1]
def get_cosine_all(con_dict,all_req):
final_dict = defaultdict(list)
for key1,val1 in con_dict.items():
temp_list = []
for key2,val2 in all_req.items():
temp_list.append((key2,cosine_sim(con_dict[key1],all_req[key2])))
final_dict[key1].append(temp_list)
return final_dict
def get_req_sorted(final_dict):
sorted_dict = {}
best_dict = {}
for i,key in enumerate(final_dict):
sorted_dict[key] = sorted(final_dict[key][0],key=itemgetter(1),reverse=True)
for key,val in sorted_dict.items():
sorted_dict[key] = val[:6]
for i,key in enumerate(sorted_dict):
ids = []
for j in range(len(sorted_dict[key])):
ids.append(sorted_dict[key][j][0])
best_dict[key] = ids
return best_dict
def pos_tags_req(text):
tokens = nltk.word_tokenize(text)
tagged = nltk.pos_tag(tokens)
tags= [(word, tag) for word,tag in tagged if word not in["[","]"]] # Additional Filter to clean square brackets from PoS tags
tags = [(word, tag) for word, tag in tags if (tag=='NNP' or tag == 'NNS'
or tag == 'NNP' or tag == 'NNPS' or tag == 'VBD' or tag == 'VBN' or tag == 'VBP' or tag == 'VBZ' or tag == 'NN')]
return list(tags)
def pos_tags_req_spacy(text):
nlp = spacy.load("en_core_web_sm")
doc = nlp(text)
for token in doc:
print(token,token.pos_)
def create_dataframe(all_req_df,candidate, best_dict):
columns = ['conflict_candidate','similar_1','similar_2','similar_3','similar_4','similar_5','similar_6']
index = list(range(0,candidate.shape[0],1))
con_sim = pd.DataFrame(columns=columns,index=index)
con_sim['conflict_candidate'] = best_dict.keys()
tags_column_1 = []
tags_column_2 = []
tags_column_3 = []
tags_column_4 = []
tags_column_5 = []
tags_column_6 = []
for k,v in best_dict.items():
tags_column_1.append(all_req_df['requirement'][all_req_df['idx'] == v[0]].values)
tags_column_2.append(all_req_df['requirement'][all_req_df['idx'] == v[1]].values)
tags_column_3.append(all_req_df['requirement'][all_req_df['idx'] == v[2]].values)
tags_column_4.append(all_req_df['requirement'][all_req_df['idx'] == v[3]].values)
tags_column_5.append(all_req_df['requirement'][all_req_df['idx'] == v[4]].values)
tags_column_6.append(all_req_df['requirement'][all_req_df['idx'] == v[5]].values)
con_sim['similar_1'] = tags_column_1
con_sim['similar_2'] = tags_column_2
con_sim['similar_3'] = tags_column_3
con_sim['similar_4'] = tags_column_4
con_sim['similar_5'] = tags_column_5
con_sim['similar_6'] = tags_column_6
con_sim['true_label'] = candidate[['conflict']][candidate['y_hat'] == 'Yes'].values
con_sim['predicted_label'] = candidate[['y_hat']][candidate['y_hat'] == 'Yes'].values
return con_sim
def final_conflict(req_df, candidate_set,test_df):
all_req_df = req_df.copy()
candidate = candidate_set.copy()
test = test_df.copy()
con_re_dict = get_dict_withid_req(candidate)
all_req_dict = get_dict_withid_req(req_df)
con_original_cosine_all = get_cosine_all(con_re_dict,all_req_dict)
con_best_dict = get_req_sorted(con_original_cosine_all)
con_sim = create_dataframe(req_df,candidate,con_best_dict)
final_df = get_ratio(con_sim)
ids = final_df['conflict_candidate'].values
for i in range(len(test_df)):
l = test_df.iloc[i].idx
if l in ids:
k = final_df.index[final_df['conflict_candidate'] == l].to_list()
label = final_df.iloc[k[0]].second_label
test_df.loc[i,'y_hat'] = label
print(classification_report(test_df.conflict.values, test_df.y_hat.values, digits = 6))
print("********** Confusion Matrix for this fold *************\n")
print(confusion_matrix(test_df.conflict.values, test_df.y_hat.values))