# !/usr/bin/env python # -*- coding: utf-8 -*- ###################################################################### # # (c) Copyright University of Southampton, 2021 # # Copyright in this software belongs to University of Southampton, # Highfield, University Road, Southampton SO17 1BJ # # Created By : Stuart E. Middleton # Created Date : 2021/01/29 # Project : Teaching # ###################################################################### from __future__ import absolute_import, division, print_function, unicode_literals import sys, codecs, json, math, time, warnings, re, logging warnings.simplefilter(action='ignore', category=FutureWarning) import nltk, numpy, scipy, sklearn, sklearn_crfsuite, sklearn_crfsuite.metrics LOG_FORMAT = ('%(levelname) -s %(asctime)s %(message)s') logger = logging.getLogger(__name__) logging.basicConfig(level=logging.INFO, format=LOG_FORMAT) logger.info('logging started') def exec_ner(file_chapter=None, ontonotes_file=None): # CHANGE CODE BELOW TO TRAIN A NER MODEL AND/OR USE REGEX GENERATE A SET OF BOOK CHARACTERS AND FILTERED SET OF NE TAGS (task 4) # Input >> www.gutenberg.org sourced plain text file for a chapter of a book # Output >> characters.txt = plain text set of extracted character names. one line per character name. # hardcoded output to show exactly what is expected to be serialized (you should change this) # only the allowed types for task 4 PERSON will be serialized dict1 = get_names(file_chapter, ontonotes_file) new_people = [] if dict1: for person in dict1["PERSON"]: if person.endswith(tuple([c for c in "qwertyuiopasdfghjklzxcvbnmQWERTYYUIOPASDFGHJKLZXCVBNM"])): new_people.append(person) dictNE = {"PERSON": []} file_chapter = open(file_chapter, "r", encoding="utf8") lines = [] current = "" for line in file_chapter: if line == "\n" or line == "\r\n": lines.append(current) current = "" else: current += line.replace("\n", " ").replace("\r", "") lines = [line.strip() for line in lines] book = "\n".join(lines) matches = re.findall( r"((?:Mrs?\.|Miss) [A-Z]\w+)|([A-Z]\. [A-Z]\w+)|(?:(?:’|'|\") said ((?:Mrs?\.|Miss) [A-Z]\w+))|(?:(?:’|'|\") said ([A-Z]\w+))|([A-Z]\w+)(?:’|')s|(?: ([A-Z][a-z]+ [A-Z][a-z]+) )", book, re.MULTILINE) for match in matches: for x in match: if x: text = nltk.word_tokenize(x) tags = nltk.pos_tag(text) include = True for _, tag in tags: if not tag.startswith("NN"): include = False if include: dictNE["PERSON"].append(x.lower().strip()) dictNE["PERSON"] += dict1["PERSON"] dictNE["PERSON"] += [n.lower().strip() for n in new_people] dictNE["PERSON"] = list(set(dictNE["PERSON"])) # DO NOT CHANGE THE BELOW CODE WHICH WILL SERIALIZE THE ANSWERS FOR THE AUTOMATED TEST HARNESS TO LOAD AND MARK # write out all PERSON entries for character list for subtask 4 writeHandle = codecs.open('characters.txt', 'w', 'utf-8', errors='replace') if 'PERSON' in dictNE: for strNE in dictNE['PERSON']: writeHandle.write(strNE.strip().lower() + '\n') writeHandle.close() names = set(nltk.corpus.names.words()) def get_names(file_chapter="eval_chapter.txt", ontonotes_file="ontonotes_parsed.json"): dictNE = {} # load parsed ontonotes dataset readHandle = codecs.open(ontonotes_file, 'r', 'utf-8-sig', errors='replace') str_json = readHandle.read() readHandle.close() dict_ontonotes = json.loads(str_json) list_files = list(dict_ontonotes.keys()) training_list = create_train_list(list_files, dict_ontonotes) # print(len(training_list)) new_list = [] new_list_2 = [] for sentence in training_list: found = False for word, postag, iob in sentence: if iob != "O": found = True if found: new_list.append(sentence) else: new_list_2.append(sentence) training_list = new_list[:round(len(new_list) * 0.75)] + new_list_2[:round(len(new_list) * 0.75)] # print(len(training_list)) stops = set(nltk.corpus.stopwords.words("english")) X_train = [] Y_train = [] for sent in training_list: features1 = [] new_sent = [] new_labels = [] for word, postag, label in sent: # word = re.sub(r'[^\w\s]','', word) new_sent.append((word, postag, label)) new_labels.append(label) for i in range(len(new_sent)): features = task2_word2features(sent, i) features1.append(features) if features1: X_train.append(features1) Y_train.append(new_labels) # X_train = [[task2_word2features(sent, i) for i in range(len(sent))] for sent in training_list] # Y_train = [[label for _, _, label in sent] for sent in training_list] labels = ['I-DATE', 'B-DATE', 'I-CARDINAL', 'B-CARDINAL', 'I-ORDINAL', 'B-ORDINAL', 'I-NORP', 'B-NORP'] # train CRF crf = sklearn_crfsuite.CRF( c1=100, c2=0.1, algorithm="lbfgs", max_iterations=150, all_possible_transitions=True, ) crf.fit(X_train, Y_train) file_chapter = open(file_chapter, "r", encoding="utf-8-sig").readlines() lines = [] current = "" for line in file_chapter: if line == "\n" or line == "\r\n": lines.append(current) current = "" else: current += line.replace("\n", " ").replace("\r", "") lines = [line.strip() for line in lines] test_data = create_test_list(lines) X_test = [] used_sents = [] for sent in test_data: features1 = [] new_sent = [] for word, postag in sent: # word = re.sub(r'[^\w\s]', '', word) new_sent.append((word, postag)) for i in range(len(new_sent)): features = task2_word2features(sent, i) features1.append(features) if features1: X_test.append(features1) used_sents.append(new_sent) test_data = used_sents # X_test = [[task2_word2features(sent, i) for i in range(len(sent))] for sent in test_data] Y_pred = crf.predict(X_test) current_one = ("", []) for i, label_list in enumerate(Y_pred): for j, label in enumerate(label_list): if label.startswith("B-"): if current_one[0] != "" and current_one[0] not in dictNE.keys(): dictNE[current_one[0]] = [] if current_one[0] in dictNE.keys(): dictNE[current_one[0]].append(" ".join(current_one[1]).strip().lower()) ne_type = label[2:] current_one = (ne_type, []) current_one[1].append(test_data[i][j][0]) elif label.startswith("I-"): ne_type = label[2:] current_one[1].append(test_data[i][j][0]) listKeys = list(dictNE.keys()) for strKey in listKeys: dictNE[strKey] = list(set(dictNE[strKey])) listAllowedTypes = ["PERSON"] listKeys = list(dictNE.keys()) for strKey in listKeys: for nIndex in range(len(dictNE[strKey])): dictNE[strKey][nIndex] = dictNE[strKey][nIndex].strip().lower() if not strKey in listAllowedTypes: del dictNE[strKey] return dictNE def task2_word2features(sent, i): lem = nltk.stem.wordnet.WordNetLemmatizer() stem = nltk.stem.porter.PorterStemmer() word = sent[i][0] postag = sent[i][1] shape_long = "" for count, c in enumerate(word): if c.isupper() and (count == 0 or (count > 0 and shape_long[-1] != "X")): shape_long += "X" elif c.islower() and (count == 0 or (count > 0 and shape_long[-1] != "x")): shape_long += "x" elif c.isdigit() and (count == 0 or (count > 0 and shape_long[-1] != "d")): shape_long += "d" else: shape_long += c features = { 'word': word, "word.lower()": word.lower(), 'postag': postag, # token shape "word.shape": shape_long, # token suffix "word.stem": stem.stem(word), "word.name": 1 if word in names else 0 } if i > 0: word_prev = sent[i - 1][0] postag_prev = sent[i - 1][1] shape_long = "" for count, c in enumerate(word_prev): if c.isupper() and (count == 0 or (count > 0 and shape_long[-1] != "X")): shape_long += "X" elif c.islower() and (count == 0 or (count > 0 and shape_long[-1] != "x")): shape_long += "x" elif c.isdigit() and (count == 0 or (count > 0 and shape_long[-1] != "d")): shape_long += "d" else: shape_long += c features.update({ '-1:word': word_prev, "-1:word.lower()": word_prev.lower(), '-1:postag': postag_prev, "-1:word.shape": shape_long, "-1:word.stem": stem.stem(word_prev), "-1:word.name": 1 if word_prev in names else 0 }) else: features['BOS'] = True if i > 1: word_prev = sent[i - 2][0] postag_prev = sent[i - 2][1] shape_long = "" for count, c in enumerate(word_prev): if c.isupper() and (count == 0 or (count > 0 and shape_long[-1] != "X")): shape_long += "X" elif c.islower() and (count == 0 or (count > 0 and shape_long[-1] != "x")): shape_long += "x" elif c.isdigit() and (count == 0 or (count > 0 and shape_long[-1] != "d")): shape_long += "d" else: shape_long += c features.update({ '-2:word': word_prev, "-2:word.lower()": word_prev.lower(), '-2:postag': postag_prev, "-2:word.shape": shape_long, "-2:word.stem": stem.stem(word_prev), "-2:word.name": 1 if word_prev in names else 0 }) if i < len(sent) - 1: word_next = sent[i + 1][0] postag_next = sent[i + 1][1] shape_long = "" for count, c in enumerate(word_next): if c.isupper() and (count == 0 or (count > 0 and shape_long[-1] != "X")): shape_long += "X" elif c.islower() and (count == 0 or (count > 0 and shape_long[-1] != "x")): shape_long += "x" elif c.isdigit() and (count == 0 or (count > 0 and shape_long[-1] != "d")): shape_long += "d" else: shape_long += c features.update({ '+1:word': word_next, "+1:word.lower()": word_next.lower(), '+1:postag': postag_next, "+1:word.shape": shape_long, "+1:word.stem": stem.stem(word_next), "+1:word.name": 1 if word_next in names else 0 }) else: features['EOS'] = True if i < len(sent) - 2: word_next = sent[i + 2][0] postag_next = sent[i + 2][1] shape_long = "" for i, c in enumerate(word_next): if c.isupper() and (i == 0 or (i > 0 and shape_long[-1] != "X")): shape_long += "X" elif c.islower() and (i == 0 or (i > 0 and shape_long[-1] != "x")): shape_long += "x" elif c.isdigit() and (i == 0 or (i > 0 and shape_long[-1] != "d")): shape_long += "d" else: shape_long += c features.update({ '+2:word': word_next, "+2:word.lower()": word_next.lower(), '+2:postag': postag_next, "+2:word.shape": shape_long, "+2:word.stem": stem.stem(word_next), "+2:word.name": 1 if word_next in names else 0 }) return features def create_train_list(data, dict_ontonotes): # sent = (tokens, pos, IOB_label) list_train = [] for str_file in data: for str_sent_index in dict_ontonotes[str_file]: # ignore sents with non-PENN POS tags if 'XX' in dict_ontonotes[str_file][str_sent_index]['pos']: continue if 'VERB' in dict_ontonotes[str_file][str_sent_index]['pos']: continue list_entry = [] # compute IOB tags for named entities (if any) ne_type_last = None for nTokenIndex in range(len(dict_ontonotes[str_file][str_sent_index]['tokens'])): strToken = dict_ontonotes[str_file][str_sent_index]['tokens'][nTokenIndex] strPOS = dict_ontonotes[str_file][str_sent_index]['pos'][nTokenIndex] ne_type = None if 'ne' in dict_ontonotes[str_file][str_sent_index]: dict_ne = dict_ontonotes[str_file][str_sent_index]['ne'] if not 'parse_error' in dict_ne: for str_NEIndex in dict_ne: if nTokenIndex in dict_ne[str_NEIndex]['tokens']: ne_type = dict_ne[str_NEIndex]['type'] break if ne_type != None and ne_type in ["PERSON"]: if ne_type == ne_type_last: strIOB = 'I-' + ne_type else: strIOB = 'B-' + ne_type else: strIOB = 'O' ne_type_last = ne_type list_entry.append((strToken, strPOS, strIOB)) list_train.append(list_entry) return list_train # from stackoverflow def get_wordnet_pos(treebank_tag): if treebank_tag.startswith('J'): return nltk.corpus.wordnet.ADJ elif treebank_tag.startswith('V'): return nltk.corpus.wordnet.VERB elif treebank_tag.startswith('N'): return nltk.corpus.wordnet.NOUN elif treebank_tag.startswith('R'): return nltk.corpus.wordnet.ADV else: return None def create_test_list(lines): out_sentences = [] for line in lines: if line: text = nltk.word_tokenize(line) pos_tags = nltk.pos_tag(text) out_sentences.append(pos_tags) return out_sentences if __name__ == '__main__': if len(sys.argv) < 4: raise Exception('missing command line args : ' + repr(sys.argv)) ontonotes_file = sys.argv[1] book_file = sys.argv[2] chapter_file = sys.argv[3] logger.info('ontonotes = ' + repr(ontonotes_file)) logger.info('book = ' + repr(book_file)) logger.info('chapter = ' + repr(chapter_file)) # DO NOT CHANGE THE CODE IN THIS FUNCTION exec_ner(chapter_file, ontonotes_file)