Paper-1-Req-Conflict / Code / utils_conflict_unsupervised.py
utils_conflict_unsupervised.py
Raw
import pandas as pd
import numpy as np
import tensorflow_hub as hub
from nltk.stem.wordnet import WordNetLemmatizer
from nltk.tokenize import RegexpTokenizer
from gensim.corpora import Dictionary
from sentence_transformers import SentenceTransformer, util
from nltk.stem.wordnet import WordNetLemmatizer
from nltk.tokenize import RegexpTokenizer
from gensim.corpora import Dictionary
import matplotlib.pyplot as plt
import torch
import nltk
from collections import defaultdict
from operator import itemgetter
nltk.download('punkt')
nltk.download('averaged_perceptron_tagger')
from scipy.spatial.distance import cosine
from transformers import AutoModel, AutoTokenizer
import seaborn as sns
import numpy as np
from collections import Counter
from wordcloud import WordCloud
from sklearn.cluster import AgglomerativeClustering
from sklearn.neighbors import kneighbors_graph
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.metrics import precision_recall_fscore_support
from sklearn.metrics import classification_report
from sklearn.metrics import confusion_matrix
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.cluster import KMeans
import warnings
warnings.filterwarnings("ignore")

#declarations:
module_url = "https://tfhub.dev/google/universal-sentence-encoder/4" 
USE_model = hub.load(module_url)




def cal_cosine_bert_tf(df):
  print("bert embeddings\n")
  model_nli_means_tokens= SentenceTransformer('sentence-transformers/all-distilroberta-v1')
  BERT_open_coss= model_nli_means_tokens.encode(df.requirement)
  tfidfconvert= TfidfVectorizer(ngram_range=(1,4)).fit(df.requirement)
  open_cos_tfidf=tfidfconvert.transform(df.requirement).todense()
  BERT_TFIDF_open_coss=np.hstack((BERT_open_coss, open_cos_tfidf))
  open_coss_cos_sim_matrix_cluster_0=cosine_similarity(BERT_TFIDF_open_coss,BERT_TFIDF_open_coss)
  return open_coss_cos_sim_matrix_cluster_0

def cal_cosine_use(df):
  print("Universal sentence encoder \n")
  USE_open_coss=USE_model(df.requirement)
  USE_open_coss_cos_sim_matrix= cosine_similarity(USE_open_coss, USE_open_coss)
  return USE_open_coss_cos_sim_matrix

def cal_cosine_sim(df):
  print("TFIDF embeddings \n")
  tf_idf = TfidfVectorizer(ngram_range=(1,4)).fit(df.requirement)
  tf_idf_vector = tf_idf.transform(df.requirement).todense()
  cos_sim = cosine_similarity(tf_idf_vector,tf_idf_vector)
  return cos_sim

def find_closest_match_v2(df, cos_sim_matrix, lookup, cluster):
    max_similarity=0
    cos_sim=[]
    text=[]
    label=[]
    idx=[]
    for i in range(len(cos_sim_matrix)):
        if  (cos_sim_matrix[lookup,i]<0.99995):
            #print('i=',i, max_similarity)
            max_similarity=cos_sim_matrix[lookup,i]
            cos_sim.append(max_similarity)
            text.append(df.requirement.iloc[i])
            idx.append(df.idx.iloc[i])
            
            #label.append(df.label.iloc[i])
            #idx=i
    #print (df.iloc[lookup].values)
    generated_df=pd.DataFrame(np.array([idx, cos_sim, text]).T, columns=['idx','Cos_sim', 'text'])
    generated_df.sort_values(by='Cos_sim', ascending=False, inplace=True)
    generated_df.reset_index(drop=True, inplace=True)
    return generated_df

def plot_roc_curve(fpr, tpr):
    plt.subplots(figsize=(8,6))
    #sns.set(rc={"grid.linewidth": 0.2})
    sns.set_context("paper", font_scale=2.0)
    plt.plot(fpr, tpr, color='orange', label='ROC')
    plt.plot([0, 1], [0, 1], color='darkblue', linestyle='--')
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    #plt.title('Receiver Operating Characteristic (ROC) Curve')
    plt.tight_layout()
    plt.legend()
    plt.show()


def find_optimal_cutoff(df,embeddings):
  print("Training :", df.shape)
  f1score_w=[]
  if embeddings == 'tfidf':
     cos_matrix = cal_cosine_sim(df)
  if embeddings == 'bert':
     cos_matrix = cal_cosine_bert_tf(df)
  if embeddings == 'use':
     cos_matrix = cal_cosine_use(df)
  df['y_hat'] = 0
  for j in range(100):
    temp = []
    for i in range(len(df)):
      inspect=find_closest_match_v2(df, cos_matrix, i,0)
      if float(inspect.iloc[0].Cos_sim)>j/100:
        temp.append('Yes')
      else:
        temp.append('No')
    se = pd.Series(temp)
    df['y_hat'] = se.values
    f1score_w.append(precision_recall_fscore_support(df.conflict.values, df.y_hat.values, average='macro'))
  f1score_w = np.array(f1score_w)
  return np.where(f1score_w[:,1] == f1score_w[:,1].max())[0][0]

def find_conflict_detect(train_df,embeddings = 0):
  df = train_df.copy()
  df = df.reset_index(drop=True)
  #print(df.shape)
  FPR = []
  TPR = []
  cos_dict = {}
  df['y_hat'] = 0
  #print(df.head(5))
  #print("df shape :\n",df.shape)
  cos_matrix = np.zeros((df.shape[0],df.shape[0]))
  if embeddings == 1:
     cos_matrix = cal_cosine_sim(df)
  elif embeddings == 2:
     cos_matrix = cal_cosine_bert_tf(df)
  elif embeddings == 3:
     cos_matrix = cal_cosine_use(df)  
  else:
     print("invalid id for embeddings \n")
  #print('shape of cos_matrix',cos_matrix.shape) 
  #print(type(cos_matrix)) 
  for k in range(100):
    for i in range(len(df)):
      #print(i)
      inspect=find_closest_match_v2(df,cos_matrix, i,0)
      if float(inspect.iloc[0].Cos_sim)>=k/100: 
        df.loc[i, 'y_hat']='Yes'
      else:
        df.loc[i, 'y_hat']='No'
    #print("df successful \n",df['y_hat'].value_counts())
    #print(df)
    
    y_true = df.conflict.values
    y_pred = df.y_hat.values
    #print(len(y_true),len(y_pred))
    data = confusion_matrix(y_true, y_pred)
    df_cm = pd.DataFrame(data, columns=np.unique(y_true), index = np.unique(y_true))
    df_cm.index.name = 'Actual'
    df_cm.columns.name = 'Predicted'
    tpr = df_cm.iloc[1,1] / (df_cm.iloc[1,1] + df_cm.iloc[1,0])
    fpr = df_cm.iloc[0,1] / (df_cm.iloc[0,1] + df_cm.iloc[0,0])
    TPR.append(tpr)
    FPR.append(fpr)
    cos_dict[k] = list([tpr,fpr])
  plot_roc_curve(FPR,TPR)
  return cos_dict
  

def find_optimal_cutoff_withroc(cos_dict):
  fpr = []
  tpr = []
  threshold = []
  for key,val in cos_dict.items():
    threshold.append(key/100)
    tpr.append(val[0])
    fpr.append(val[1])
  i = np.arange(len(tpr)) 
  tf = np.array(tpr) - np.array(1 - np.array(fpr))
  roc = pd.DataFrame({'tf' : pd.Series(tf, index=i), 'threshold' : pd.Series(threshold, index=i)})
  roc_t = roc.iloc[(roc.tf-0).abs().argsort()[:1]]
  return float(roc_t['threshold'])


def test_cutoff(test_df,cutoff,embeddings = 0):
  temp = []
  df = test_df.copy()
  df = df.reset_index(drop=True)
  cos_matrix = np.zeros((df.shape[0],df.shape[0]))
  if embeddings == 1:
     cos_matrix = cal_cosine_sim(df)
  elif embeddings == 2:
     cos_matrix = cal_cosine_bert_tf(df)
  elif embeddings == 3:
     cos_matrix = cal_cosine_use(df)  
  else:
     print("invalid id for embeddings \n")
  for i in range(len(df)):
    inspect=find_closest_match_v2(df,cos_matrix , i,0)
    if float(inspect.iloc[0].Cos_sim) >= cutoff:
      temp.append('Yes')
    else:
      temp.append('No')
  df['y_hat'] = temp
  print(classification_report(df.conflict.values, df.y_hat.values, digits = 6))
  print("********** Confusion Matrix for this fold *************\n")
  print(confusion_matrix(df.conflict.values, df.y_hat.values))
  cf = confusion_matrix(df.conflict.values, df.y_hat.values)
  df_cm = pd.DataFrame(cf, columns=np.unique(df.conflict.values), index = np.unique(df.conflict.values))
  df_cm.index.name = 'Actual'
  df_cm.columns.name = 'Predicted'
  tpr = df_cm.iloc[1,1] / (df_cm.iloc[1,1] + df_cm.iloc[1,0])
  print("The tpr for this fold is :", tpr)
  candidate_set = df.loc[df['y_hat'] == 'Yes']
  return df, candidate_set

def count_unique_nouns(requirement):
  noun_counter=[]
  tagged_requirement= pos_tags_req(requirement)
  for word, tag in tagged_requirement:
    if 'V' not in tag: # Skip verbs - to be verified with nltk documentation
      noun_counter.append(word) # Consider lemmatization for better generalization
  return (set(noun_counter))

def count_similar_nouns(requirement, conflict_candidate):

  similar_nouns_counter=[]
  tagged_requirement= pos_tags_req(requirement)
  tagged_conflict_candidate=pos_tags_req(conflict_candidate)

  for word, tag in tagged_requirement:
    if 'V' not in tag: # Skip verbs - to be verified with nltk PoS documentation
      for conflict_word, conflict_tag in tagged_conflict_candidate:
        if word==conflict_word and 'V' not in conflict_tag:
          similar_nouns_counter.append((word,conflict_word))
  return len(similar_nouns_counter)


def get_all_noun_count(con_sim,i):
  total_noun_count_list = []
  total_noun_count_list.append(count_unique_nouns(str(con_sim['similar_1'].iloc[i])))
  total_noun_count_list.append(count_unique_nouns(str(con_sim['similar_2'].iloc[i])))
  total_noun_count_list.append(count_unique_nouns(str(con_sim['similar_3'].iloc[i])))
  total_noun_count_list.append(count_unique_nouns(str(con_sim['similar_4'].iloc[i])))
  total_noun_count_list.append(count_unique_nouns(str(con_sim['similar_5'].iloc[i])))
  total_noun_count_list.append(count_unique_nouns(str(con_sim['similar_6'].iloc[i])))
  return total_noun_count_list

def get_all_similar_noun_count(con_sim,i):
  total_noun_count_list = 0
  total_noun_count_list = total_noun_count_list + count_similar_nouns(str(con_sim['similar_1'].iloc[i]),str(con_sim['similar_1'].iloc[i]))
  total_noun_count_list = total_noun_count_list + count_similar_nouns(str(con_sim['similar_2'].iloc[i]),str(con_sim['similar_1'].iloc[i]))
  total_noun_count_list = total_noun_count_list + count_similar_nouns(str(con_sim['similar_3'].iloc[i]),str(con_sim['similar_1'].iloc[i]))
  total_noun_count_list = total_noun_count_list + count_similar_nouns(str(con_sim['similar_4'].iloc[i]),str(con_sim['similar_1'].iloc[i]))
  total_noun_count_list = total_noun_count_list + count_similar_nouns(str(con_sim['similar_5'].iloc[i]),str(con_sim['similar_1'].iloc[i]))
  total_noun_count_list = total_noun_count_list + count_similar_nouns(str(con_sim['similar_6'].iloc[i]),str(con_sim['similar_1'].iloc[i]))
  return total_noun_count_list

def get_similar_noun_count(con_sim):
  similar_noun_column = []
  for i in range(con_sim.shape[0]):
    similar_noun_column.append(get_all_similar_noun_count(con_sim,i))
  return similar_noun_column

def get_unique_noun_count(con_sim):
  unique_noun_column = []
  for i in range(con_sim.shape[0]):
    total_noun_count_list = get_all_noun_count(con_sim,i)
    total_unique_noun_set = []
    for i in total_noun_count_list:
      for j in i :
        total_unique_noun_set.append(j)
    l = len(set(total_unique_noun_set))
    unique_noun_column.append(l)
  return unique_noun_column

def get_ratio(df):
  con_sim = df.copy()
  con_sim = con_sim.reset_index(drop=True)
  con_sim['similar_noun'] = get_similar_noun_count(con_sim)
  con_sim['unique_noun'] = get_unique_noun_count(con_sim)
  con_sim['noun_ratio'] = con_sim['similar_noun']/con_sim['unique_noun']
  con_sim['second_label'] = 0
  for i in range(len(con_sim)):
    if float(con_sim.iloc[i].noun_ratio) >= 1:
      con_sim.loc[i, 'second_label']='Yes'
    else:
      con_sim.loc[i,'second_label'] = 'No'
  return con_sim

def get_dict_withid_req(df):
  i_dict = {}
  for i in range(df.shape[0]):
    i_dict[df.iloc[i].idx] = df.iloc[i].requirement
  return i_dict

def cosine_sim(text1, text2):
    vectorizer = TfidfVectorizer(stop_words='english',ngram_range=(1,4))
    tfidf = vectorizer.fit_transform([text1, text2])
    return ((tfidf * tfidf.T).A)[0,1]

def get_cosine_all(con_dict,all_req):
  final_dict = defaultdict(list)
  for key1,val1 in con_dict.items():
    temp_list = []
    for key2,val2 in all_req.items():
      temp_list.append((key2,cosine_sim(con_dict[key1],all_req[key2])))
    final_dict[key1].append(temp_list)
  return final_dict

def get_req_sorted(final_dict):
  sorted_dict = {}
  best_dict = {}
  for i,key in enumerate(final_dict):
    sorted_dict[key] = sorted(final_dict[key][0],key=itemgetter(1),reverse=True)
  for key,val in sorted_dict.items():
    sorted_dict[key] = val[:6]
  for i,key in enumerate(sorted_dict):
    ids = []
    for j in range(len(sorted_dict[key])):
      ids.append(sorted_dict[key][j][0])
    best_dict[key] = ids
  return best_dict

def pos_tags_req(text):
  tokens = nltk.word_tokenize(text)
  tagged = nltk.pos_tag(tokens)
  tags= [(word, tag) for word,tag in tagged if word not in["[","]"]] # Additional Filter to clean square brackets from PoS tags
  tags =  [(word, tag) for word, tag in tags if (tag=='NNP' or tag == 'NNS' 
  or tag == 'NNP' or tag == 'NNPS' or tag == 'VBD' or tag == 'VBN' or tag == 'VBP' or tag == 'VBZ' or tag == 'NN')]
  return list(tags)

def pos_tags_req_spacy(text):
  nlp = spacy.load("en_core_web_sm")
  doc = nlp(text)
  for token in doc:
    print(token,token.pos_)

def create_dataframe(all_req_df,candidate, best_dict):

  columns = ['conflict_candidate','similar_1','similar_2','similar_3','similar_4','similar_5','similar_6']
  index = list(range(0,candidate.shape[0],1))
  con_sim = pd.DataFrame(columns=columns,index=index)
  con_sim['conflict_candidate'] = best_dict.keys()
  tags_column_1 = []
  tags_column_2 = []
  tags_column_3 = []
  tags_column_4 = []
  tags_column_5 = []
  tags_column_6 = []
  for k,v in best_dict.items():
    tags_column_1.append(all_req_df['requirement'][all_req_df['idx'] == v[0]].values)
    tags_column_2.append(all_req_df['requirement'][all_req_df['idx'] == v[1]].values)
    tags_column_3.append(all_req_df['requirement'][all_req_df['idx'] == v[2]].values)
    tags_column_4.append(all_req_df['requirement'][all_req_df['idx'] == v[3]].values)
    tags_column_5.append(all_req_df['requirement'][all_req_df['idx'] == v[4]].values)
    tags_column_6.append(all_req_df['requirement'][all_req_df['idx'] == v[5]].values)
  con_sim['similar_1'] = tags_column_1
  con_sim['similar_2'] = tags_column_2
  con_sim['similar_3'] = tags_column_3
  con_sim['similar_4'] = tags_column_4
  con_sim['similar_5'] = tags_column_5
  con_sim['similar_6'] = tags_column_6
  con_sim['true_label'] = candidate[['conflict']][candidate['y_hat'] == 'Yes'].values
  con_sim['predicted_label'] = candidate[['y_hat']][candidate['y_hat'] == 'Yes'].values
  return con_sim

def final_conflict(req_df, candidate_set,test_df):
  all_req_df = req_df.copy()
  candidate = candidate_set.copy()
  test = test_df.copy()
  con_re_dict = get_dict_withid_req(candidate)
  all_req_dict = get_dict_withid_req(req_df)
  con_original_cosine_all = get_cosine_all(con_re_dict,all_req_dict)
  con_best_dict = get_req_sorted(con_original_cosine_all)
  con_sim = create_dataframe(req_df,candidate,con_best_dict)
  final_df = get_ratio(con_sim)
  ids = final_df['conflict_candidate'].values
  for i in range(len(test_df)):
    l = test_df.iloc[i].idx
    if l in ids:
      k = final_df.index[final_df['conflict_candidate'] == l].to_list()
      label = final_df.iloc[k[0]].second_label
      test_df.loc[i,'y_hat'] = label
  print(classification_report(test_df.conflict.values, test_df.y_hat.values, digits = 6))
  print("********** Confusion Matrix for this fold *************\n")
  print(confusion_matrix(test_df.conflict.values, test_df.y_hat.values))