nlp-cw / task4_submission.py
task4_submission.py
Raw
# !/usr/bin/env python
# -*- coding: utf-8 -*-

######################################################################
#
# (c) Copyright University of Southampton, 2021
#
# Copyright in this software belongs to University of Southampton,
# Highfield, University Road, Southampton SO17 1BJ
#
# Created By : Stuart E. Middleton
# Created Date : 2021/01/29
# Project : Teaching
#
######################################################################

from __future__ import absolute_import, division, print_function, unicode_literals

import sys, codecs, json, math, time, warnings, re, logging

warnings.simplefilter(action='ignore', category=FutureWarning)

import nltk, numpy, scipy, sklearn, sklearn_crfsuite, sklearn_crfsuite.metrics

LOG_FORMAT = ('%(levelname) -s %(asctime)s %(message)s')
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
logger.info('logging started')


def exec_ner(file_chapter=None, ontonotes_file=None):
    # CHANGE CODE BELOW TO TRAIN A NER MODEL AND/OR USE REGEX GENERATE A SET OF BOOK CHARACTERS AND FILTERED SET OF NE TAGS (task 4)

    # Input >> www.gutenberg.org sourced plain text file for a chapter of a book
    # Output >> characters.txt = plain text set of extracted character names. one line per character name.

    # hardcoded output to show exactly what is expected to be serialized (you should change this)
    # only the allowed types for task 4 PERSON will be serialized
    dict1 = get_names(file_chapter, ontonotes_file)

    new_people = []
    if dict1:
        for person in dict1["PERSON"]:
            if person.endswith(tuple([c for c in "qwertyuiopasdfghjklzxcvbnmQWERTYYUIOPASDFGHJKLZXCVBNM"])):
                new_people.append(person)
    dictNE = {"PERSON": []}
    file_chapter = open(file_chapter, "r", encoding="utf8")
    lines = []
    current = ""
    for line in file_chapter:
        if line == "\n" or line == "\r\n":
            lines.append(current)
            current = ""
        else:
            current += line.replace("\n", " ").replace("\r", "")

    lines = [line.strip() for line in lines]

    book = "\n".join(lines)

    matches = re.findall(
        r"((?:Mrs?\.|Miss) [A-Z]\w+)|([A-Z]\. [A-Z]\w+)|(?:(?:’|'|\") said ((?:Mrs?\.|Miss) [A-Z]\w+))|(?:(?:’|'|\") said ([A-Z]\w+))|([A-Z]\w+)(?:’|')s|(?: ([A-Z][a-z]+ [A-Z][a-z]+) )",
        book,
        re.MULTILINE)
    for match in matches:
        for x in match:
            if x:
                text = nltk.word_tokenize(x)
                tags = nltk.pos_tag(text)
                include = True
                for _, tag in tags:
                    if not tag.startswith("NN"):
                        include = False
                if include:
                    dictNE["PERSON"].append(x.lower().strip())

    dictNE["PERSON"] += dict1["PERSON"]
    dictNE["PERSON"] += [n.lower().strip() for n in new_people]
    dictNE["PERSON"] = list(set(dictNE["PERSON"]))

    # DO NOT CHANGE THE BELOW CODE WHICH WILL SERIALIZE THE ANSWERS FOR THE AUTOMATED TEST HARNESS TO LOAD AND MARK

    # write out all PERSON entries for character list for subtask 4
    writeHandle = codecs.open('characters.txt', 'w', 'utf-8', errors='replace')
    if 'PERSON' in dictNE:
        for strNE in dictNE['PERSON']:
            writeHandle.write(strNE.strip().lower() + '\n')
    writeHandle.close()

names = set(nltk.corpus.names.words())

def get_names(file_chapter="eval_chapter.txt", ontonotes_file="ontonotes_parsed.json"):
    dictNE = {}
    # load parsed ontonotes dataset
    readHandle = codecs.open(ontonotes_file, 'r', 'utf-8-sig', errors='replace')
    str_json = readHandle.read()
    readHandle.close()
    dict_ontonotes = json.loads(str_json)

    list_files = list(dict_ontonotes.keys())

    training_list = create_train_list(list_files, dict_ontonotes)
    # print(len(training_list))


    new_list = []
    new_list_2 = []
    for sentence in training_list:
        found = False
        for word, postag, iob in sentence:
            if iob != "O":
                found = True
        if found:
            new_list.append(sentence)
        else:
            new_list_2.append(sentence)

    training_list = new_list[:round(len(new_list) * 0.75)] + new_list_2[:round(len(new_list) * 0.75)]

    # print(len(training_list))

    stops = set(nltk.corpus.stopwords.words("english"))

    X_train = []
    Y_train = []
    for sent in training_list:
        features1 = []
        new_sent = []
        new_labels = []
        for word, postag, label in sent:
            # word = re.sub(r'[^\w\s]','', word)
            new_sent.append((word, postag, label))
            new_labels.append(label)

        for i in range(len(new_sent)):
            features = task2_word2features(sent, i)
            features1.append(features)
        if features1:
            X_train.append(features1)
            Y_train.append(new_labels)

    # X_train = [[task2_word2features(sent, i) for i in range(len(sent))] for sent in training_list]
    # Y_train = [[label for _, _, label in sent] for sent in training_list]
    labels = ['I-DATE', 'B-DATE', 'I-CARDINAL', 'B-CARDINAL', 'I-ORDINAL', 'B-ORDINAL', 'I-NORP', 'B-NORP']

    # train CRF
    crf = sklearn_crfsuite.CRF(
        c1=100,
        c2=0.1,
        algorithm="lbfgs",
        max_iterations=150,
        all_possible_transitions=True,
    )

    crf.fit(X_train, Y_train)

    file_chapter = open(file_chapter, "r", encoding="utf-8-sig").readlines()
    lines = []
    current = ""
    for line in file_chapter:
        if line == "\n" or line == "\r\n":
            lines.append(current)
            current = ""
        else:
            current += line.replace("\n", " ").replace("\r", "")

    lines = [line.strip() for line in lines]
    test_data = create_test_list(lines)

    X_test = []
    used_sents = []
    for sent in test_data:
        features1 = []
        new_sent = []

        for word, postag in sent:
            # word = re.sub(r'[^\w\s]', '', word)
            new_sent.append((word, postag))

        for i in range(len(new_sent)):
            features = task2_word2features(sent, i)
            features1.append(features)
        if features1:
            X_test.append(features1)
            used_sents.append(new_sent)

    test_data = used_sents

    # X_test = [[task2_word2features(sent, i) for i in range(len(sent))] for sent in test_data]

    Y_pred = crf.predict(X_test)

    current_one = ("", [])
    for i, label_list in enumerate(Y_pred):
        for j, label in enumerate(label_list):
            if label.startswith("B-"):
                if current_one[0] != "" and current_one[0] not in dictNE.keys():
                    dictNE[current_one[0]] = []
                if current_one[0] in dictNE.keys():
                    dictNE[current_one[0]].append(" ".join(current_one[1]).strip().lower())
                ne_type = label[2:]
                current_one = (ne_type, [])
                current_one[1].append(test_data[i][j][0])
            elif label.startswith("I-"):
                ne_type = label[2:]
                current_one[1].append(test_data[i][j][0])

    listKeys = list(dictNE.keys())
    for strKey in listKeys:
        dictNE[strKey] = list(set(dictNE[strKey]))

    listAllowedTypes = ["PERSON"]
    listKeys = list(dictNE.keys())
    for strKey in listKeys:
        for nIndex in range(len(dictNE[strKey])):
            dictNE[strKey][nIndex] = dictNE[strKey][nIndex].strip().lower()
        if not strKey in listAllowedTypes:
            del dictNE[strKey]

    return dictNE


def task2_word2features(sent, i):
    lem = nltk.stem.wordnet.WordNetLemmatizer()
    stem = nltk.stem.porter.PorterStemmer()

    word = sent[i][0]
    postag = sent[i][1]
    shape_long = ""
    for count, c in enumerate(word):
        if c.isupper() and (count == 0 or (count > 0 and shape_long[-1] != "X")):
            shape_long += "X"
        elif c.islower() and (count == 0 or (count > 0 and shape_long[-1] != "x")):
            shape_long += "x"
        elif c.isdigit() and (count == 0 or (count > 0 and shape_long[-1] != "d")):
            shape_long += "d"
        else:
            shape_long += c

    features = {
        'word': word,
        "word.lower()": word.lower(),
        'postag': postag,

        # token shape
        "word.shape": shape_long,

        # token suffix
        "word.stem": stem.stem(word),
        "word.name": 1 if word in names else 0
    }
    if i > 0:
        word_prev = sent[i - 1][0]
        postag_prev = sent[i - 1][1]
        shape_long = ""
        for count, c in enumerate(word_prev):
            if c.isupper() and (count == 0 or (count > 0 and shape_long[-1] != "X")):
                shape_long += "X"
            elif c.islower() and (count == 0 or (count > 0 and shape_long[-1] != "x")):
                shape_long += "x"
            elif c.isdigit() and (count == 0 or (count > 0 and shape_long[-1] != "d")):
                shape_long += "d"
            else:
                shape_long += c
        features.update({
            '-1:word': word_prev,
            "-1:word.lower()": word_prev.lower(),
            '-1:postag': postag_prev,
            "-1:word.shape": shape_long,
            "-1:word.stem": stem.stem(word_prev),
            "-1:word.name": 1 if word_prev in names else 0
        })
    else:
        features['BOS'] = True

    if i > 1:
        word_prev = sent[i - 2][0]
        postag_prev = sent[i - 2][1]
        shape_long = ""
        for count, c in enumerate(word_prev):
            if c.isupper() and (count == 0 or (count > 0 and shape_long[-1] != "X")):
                shape_long += "X"
            elif c.islower() and (count == 0 or (count > 0 and shape_long[-1] != "x")):
                shape_long += "x"
            elif c.isdigit() and (count == 0 or (count > 0 and shape_long[-1] != "d")):
                shape_long += "d"
            else:
                shape_long += c
        features.update({
            '-2:word': word_prev,
            "-2:word.lower()": word_prev.lower(),
            '-2:postag': postag_prev,
            "-2:word.shape": shape_long,
            "-2:word.stem": stem.stem(word_prev),
            "-2:word.name": 1 if word_prev in names else 0
        })

    if i < len(sent) - 1:
        word_next = sent[i + 1][0]
        postag_next = sent[i + 1][1]
        shape_long = ""
        for count, c in enumerate(word_next):
            if c.isupper() and (count == 0 or (count > 0 and shape_long[-1] != "X")):
                shape_long += "X"
            elif c.islower() and (count == 0 or (count > 0 and shape_long[-1] != "x")):
                shape_long += "x"
            elif c.isdigit() and (count == 0 or (count > 0 and shape_long[-1] != "d")):
                shape_long += "d"
            else:
                shape_long += c
        features.update({
            '+1:word': word_next,
            "+1:word.lower()": word_next.lower(),
            '+1:postag': postag_next,
            "+1:word.shape": shape_long,
            "+1:word.stem": stem.stem(word_next),
            "+1:word.name": 1 if word_next in names else 0

        })
    else:
        features['EOS'] = True

    if i < len(sent) - 2:
        word_next = sent[i + 2][0]
        postag_next = sent[i + 2][1]
        shape_long = ""
        for i, c in enumerate(word_next):
            if c.isupper() and (i == 0 or (i > 0 and shape_long[-1] != "X")):
                shape_long += "X"
            elif c.islower() and (i == 0 or (i > 0 and shape_long[-1] != "x")):
                shape_long += "x"
            elif c.isdigit() and (i == 0 or (i > 0 and shape_long[-1] != "d")):
                shape_long += "d"
            else:
                shape_long += c
        features.update({
            '+2:word': word_next,
            "+2:word.lower()": word_next.lower(),
            '+2:postag': postag_next,
            "+2:word.shape": shape_long,
            "+2:word.stem": stem.stem(word_next),
            "+2:word.name": 1 if word_next in names else 0

        })

    return features

def create_train_list(data, dict_ontonotes):
    # sent = (tokens, pos, IOB_label)
    list_train = []
    for str_file in data:
        for str_sent_index in dict_ontonotes[str_file]:
            # ignore sents with non-PENN POS tags
            if 'XX' in dict_ontonotes[str_file][str_sent_index]['pos']:
                continue
            if 'VERB' in dict_ontonotes[str_file][str_sent_index]['pos']:
                continue


            list_entry = []

            # compute IOB tags for named entities (if any)
            ne_type_last = None
            for nTokenIndex in range(len(dict_ontonotes[str_file][str_sent_index]['tokens'])):
                strToken = dict_ontonotes[str_file][str_sent_index]['tokens'][nTokenIndex]
                strPOS = dict_ontonotes[str_file][str_sent_index]['pos'][nTokenIndex]
                ne_type = None
                if 'ne' in dict_ontonotes[str_file][str_sent_index]:
                    dict_ne = dict_ontonotes[str_file][str_sent_index]['ne']
                    if not 'parse_error' in dict_ne:
                        for str_NEIndex in dict_ne:
                            if nTokenIndex in dict_ne[str_NEIndex]['tokens']:
                                ne_type = dict_ne[str_NEIndex]['type']
                                break
                if ne_type != None and ne_type in ["PERSON"]:
                    if ne_type == ne_type_last:
                        strIOB = 'I-' + ne_type
                    else:
                        strIOB = 'B-' + ne_type
                else:
                    strIOB = 'O'
                ne_type_last = ne_type

                list_entry.append((strToken, strPOS, strIOB))

            list_train.append(list_entry)
    return list_train

# from stackoverflow
def get_wordnet_pos(treebank_tag):
    if treebank_tag.startswith('J'):
        return nltk.corpus.wordnet.ADJ
    elif treebank_tag.startswith('V'):
        return nltk.corpus.wordnet.VERB
    elif treebank_tag.startswith('N'):
        return nltk.corpus.wordnet.NOUN
    elif treebank_tag.startswith('R'):
        return nltk.corpus.wordnet.ADV
    else:
        return None

def create_test_list(lines):
    out_sentences = []
    for line in lines:
        if line:
            text = nltk.word_tokenize(line)
            pos_tags = nltk.pos_tag(text)
            out_sentences.append(pos_tags)
    return out_sentences


if __name__ == '__main__':
    if len(sys.argv) < 4:
        raise Exception('missing command line args : ' + repr(sys.argv))
    ontonotes_file = sys.argv[1]
    book_file = sys.argv[2]
    chapter_file = sys.argv[3]

    logger.info('ontonotes = ' + repr(ontonotes_file))
    logger.info('book = ' + repr(book_file))
    logger.info('chapter = ' + repr(chapter_file))

    # DO NOT CHANGE THE CODE IN THIS FUNCTION

    exec_ner(chapter_file, ontonotes_file)