# !/usr/bin/env python # -*- coding: utf-8 -*- ###################################################################### # # (c) Copyright University of Southampton, 2021 # # Copyright in this software belongs to University of Southampton, # Highfield, University Road, Southampton SO17 1BJ # # Created By : Stuart E. Middleton # Created Date : 2021/01/29 # Project : Teaching # ###################################################################### from __future__ import absolute_import, division, print_function, unicode_literals import sys, codecs, json, math, time, warnings, re, logging warnings.simplefilter(action='ignore', category=FutureWarning) import nltk, numpy, scipy, sklearn, sklearn_crfsuite, sklearn_crfsuite.metrics LOG_FORMAT = ('%(levelname) -s %(asctime)s %(message)s') logger = logging.getLogger(__name__) logging.basicConfig(level=logging.INFO, format=LOG_FORMAT) logger.info('logging started') def exec_ner(file_chapter=None, ontonotes_file=None): # CHANGE CODE BELOW TO TRAIN A CRF NER MODEL TO TAG THE CHAPTER OF TEXT (task 3) # Input >> www.gutenberg.org sourced plain text file for a chapter of a book # Output >> ne.json = { <ne_type> : [ <phrase>, <phrase>, ... ] } # hardcoded output to show exactly what is expected to be serialized (you should change this) # only the allowed types for task 3 DATE, CARDINAL, ORDINAL, NORP will be serialized dictNE = {} # load parsed ontonotes dataset readHandle = codecs.open(ontonotes_file, 'r', 'utf-8-sig', errors='replace') str_json = readHandle.read() readHandle.close() dict_ontonotes = json.loads(str_json) list_files = list(dict_ontonotes.keys()) if len(list_files) > 1000: list_files = list_files[:1000] training_list = create_train_list(list_files, dict_ontonotes) X_train = [[task2_word2features(sent, i) for i in range(len(sent))] for sent in training_list] Y_train = [[label for _, _, label in sent] for sent in training_list] labels = ['I-DATE', 'B-DATE', 'I-CARDINAL', 'B-CARDINAL', 'I-ORDINAL', 'B-ORDINAL', 'I-NORP', 'B-NORP'] # train CRF crf = sklearn_crfsuite.CRF( c1=100, c2=0.1, algorithm="lbfgs", max_iterations=50, all_possible_transitions=True, ) crf.fit(X_train, Y_train) file_chapter = open(file_chapter, "r", encoding="utf-8-sig").readlines() lines = [] current = "" for line in file_chapter: if line == "\n" or line == "\r\n": lines.append(current) current = "" else: current += line.replace("\n", " ").replace("\r", "") lines = [line.strip() for line in lines] test_data = create_test_list(lines) X_test = [[task2_word2features(sent, i) for i in range(len(sent))] for sent in test_data] Y_pred = crf.predict(X_test) current_one = ("", []) for i, label_list in enumerate(Y_pred): for j, label in enumerate(label_list): if label.startswith("B-"): if current_one[0] != "" and current_one[0] not in dictNE.keys(): dictNE[current_one[0]] = [] if current_one[0] in dictNE.keys(): dictNE[current_one[0]].append(" ".join(current_one[1]).strip().lower()) ne_type = label[2:] current_one = (ne_type, []) current_one[1].append(test_data[i][j][0]) elif label.startswith("I-"): ne_type = label[2:] current_one[1].append(test_data[i][j][0]) listKeys = list(dictNE.keys()) for strKey in listKeys: dictNE[strKey] = list(set(dictNE[strKey])) # DO NOT CHANGE THE BELOW CODE WHICH WILL SERIALIZE THE ANSWERS FOR THE AUTOMATED TEST HARNESS TO LOAD AND MARK # FILTER NE dict by types required for task 3 listAllowedTypes = ['DATE', 'CARDINAL', 'ORDINAL', 'NORP'] listKeys = list(dictNE.keys()) for strKey in listKeys: for nIndex in range(len(dictNE[strKey])): dictNE[strKey][nIndex] = dictNE[strKey][nIndex].strip().lower() if not strKey in listAllowedTypes: del dictNE[strKey] # write filtered NE dict writeHandle = codecs.open('ne.json', 'w', 'utf-8', errors='replace') strJSON = json.dumps(dictNE, indent=2) writeHandle.write(strJSON + '\n') writeHandle.close() def task2_word2features(sent, i): lem = nltk.stem.wordnet.WordNetLemmatizer() stem = nltk.stem.porter.PorterStemmer() word = sent[i][0] postag = sent[i][1] postag2 = get_wordnet_pos(postag) features = { 'word': word, 'postag': postag, # token shape 'word.lower()': word.lower(), 'word.isupper()': word.isupper(), 'word.istitle()': word.istitle(), 'word.isdigit()': word.isdigit(), # token suffix 'word.suffix': word.lower()[-3:], # POS prefix 'postag[:2]': postag[:2], #lemma and stem 'stem': stem.stem(word) } if postag2: features["lemma"] = lem.lemmatize(word, postag2) if i > 0: word_prev = sent[i - 1][0] postag_prev = sent[i - 1][1] postag2_prev = get_wordnet_pos(postag_prev) features.update({ '-1:word.lower()': word_prev.lower(), '-1:postag': postag_prev, '-1:word.isupper()': word_prev.isupper(), '-1:word.istitle()': word_prev.istitle(), '-1:word.isdigit()': word_prev.isdigit(), '-1:word.suffix': word_prev.lower()[-3:], '-1:postag[:2]': postag_prev[:2], '-1:stem': stem.stem(word_prev) }) if postag2_prev is not None: features["-1:lemma"] = lem.lemmatize(word, postag2_prev) else: features['BOS'] = True if i < len(sent) - 1: word_next = sent[i + 1][0] postag_next = sent[i + 1][1] postag2_next = get_wordnet_pos(postag_next) features.update({ '+1:word.lower()': word_next.lower(), '+1:postag': postag_next, '+1:word.isupper()': word_next.isupper(), '+1:word.istitle()': word_next.istitle(), '+1:word.isdigit()': word_next.isdigit(), '+1:word.suffix': word_next.lower()[-3:], '+1:postag[:2]': postag_next[:2], '+1:stem': stem.stem(word_next) }) if postag2_next is not None: features["+1:lemma"] = lem.lemmatize(word, postag2_next) else: features['EOS'] = True return features def create_train_list(data, dict_ontonotes): # sent = (tokens, pos, IOB_label) list_train = [] for str_file in data: for str_sent_index in dict_ontonotes[str_file]: # ignore sents with non-PENN POS tags if 'XX' in dict_ontonotes[str_file][str_sent_index]['pos']: continue if 'VERB' in dict_ontonotes[str_file][str_sent_index]['pos']: continue list_entry = [] # compute IOB tags for named entities (if any) ne_type_last = None for nTokenIndex in range(len(dict_ontonotes[str_file][str_sent_index]['tokens'])): strToken = dict_ontonotes[str_file][str_sent_index]['tokens'][nTokenIndex] strPOS = dict_ontonotes[str_file][str_sent_index]['pos'][nTokenIndex] ne_type = None if 'ne' in dict_ontonotes[str_file][str_sent_index]: dict_ne = dict_ontonotes[str_file][str_sent_index]['ne'] if not 'parse_error' in dict_ne: for str_NEIndex in dict_ne: if nTokenIndex in dict_ne[str_NEIndex]['tokens']: ne_type = dict_ne[str_NEIndex]['type'] break if ne_type != None and ne_type in ["DATE", "ORDINAL", "CARDINAL", "NORP"]: if ne_type == ne_type_last: strIOB = 'I-' + ne_type else: strIOB = 'B-' + ne_type else: strIOB = 'O' ne_type_last = ne_type list_entry.append((strToken, strPOS, strIOB)) list_train.append(list_entry) return list_train # from stackoverflow def get_wordnet_pos(treebank_tag): if treebank_tag.startswith('J'): return nltk.corpus.wordnet.ADJ elif treebank_tag.startswith('V'): return nltk.corpus.wordnet.VERB elif treebank_tag.startswith('N'): return nltk.corpus.wordnet.NOUN elif treebank_tag.startswith('R'): return nltk.corpus.wordnet.ADV else: return None def create_test_list(lines): out_sentences = [] for line in lines: text = nltk.word_tokenize(line) pos_tags = nltk.pos_tag(text) out_sentences.append(pos_tags) return out_sentences if __name__ == '__main__': if len(sys.argv) < 4: raise Exception('missing command line args : ' + repr(sys.argv)) ontonotes_file = sys.argv[1] book_file = sys.argv[2] chapter_file = sys.argv[3] logger.info('ontonotes = ' + repr(ontonotes_file)) logger.info('book = ' + repr(book_file)) logger.info('chapter = ' + repr(chapter_file)) # DO NOT CHANGE THE CODE IN THIS FUNCTION exec_ner(chapter_file, ontonotes_file)