nlp-cw / task3_submission.py
task3_submission.py
Raw
# !/usr/bin/env python
# -*- coding: utf-8 -*-

######################################################################
#
# (c) Copyright University of Southampton, 2021
#
# Copyright in this software belongs to University of Southampton,
# Highfield, University Road, Southampton SO17 1BJ
#
# Created By : Stuart E. Middleton
# Created Date : 2021/01/29
# Project : Teaching
#
######################################################################

from __future__ import absolute_import, division, print_function, unicode_literals

import sys, codecs, json, math, time, warnings, re, logging

warnings.simplefilter(action='ignore', category=FutureWarning)

import nltk, numpy, scipy, sklearn, sklearn_crfsuite, sklearn_crfsuite.metrics

LOG_FORMAT = ('%(levelname) -s %(asctime)s %(message)s')
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
logger.info('logging started')


def exec_ner(file_chapter=None, ontonotes_file=None):
    # CHANGE CODE BELOW TO TRAIN A CRF NER MODEL TO TAG THE CHAPTER OF TEXT (task 3)

    # Input >> www.gutenberg.org sourced plain text file for a chapter of a book
    # Output >> ne.json = { <ne_type> : [ <phrase>, <phrase>, ... ] }

    # hardcoded output to show exactly what is expected to be serialized (you should change this)
    # only the allowed types for task 3 DATE, CARDINAL, ORDINAL, NORP will be serialized
    dictNE = {}
    # load parsed ontonotes dataset
    readHandle = codecs.open(ontonotes_file, 'r', 'utf-8-sig', errors='replace')
    str_json = readHandle.read()
    readHandle.close()
    dict_ontonotes = json.loads(str_json)

    list_files = list(dict_ontonotes.keys())
    if len(list_files) > 1000:
        list_files = list_files[:1000]
    training_list = create_train_list(list_files, dict_ontonotes)

    X_train = [[task2_word2features(sent, i) for i in range(len(sent))] for sent in training_list]
    Y_train = [[label for _, _, label in sent] for sent in training_list]
    labels = ['I-DATE', 'B-DATE', 'I-CARDINAL', 'B-CARDINAL', 'I-ORDINAL', 'B-ORDINAL', 'I-NORP', 'B-NORP']

    # train CRF
    crf = sklearn_crfsuite.CRF(
        c1=100,
        c2=0.1,
        algorithm="lbfgs",
        max_iterations=50,
        all_possible_transitions=True,
    )

    crf.fit(X_train, Y_train)

    file_chapter = open(file_chapter, "r", encoding="utf-8-sig").readlines()
    lines = []
    current = ""
    for line in file_chapter:
        if line == "\n" or line == "\r\n":
            lines.append(current)
            current = ""
        else:
            current += line.replace("\n", " ").replace("\r", "")

    lines = [line.strip() for line in lines]
    test_data = create_test_list(lines)

    X_test = [[task2_word2features(sent, i) for i in range(len(sent))] for sent in test_data]

    Y_pred = crf.predict(X_test)

    current_one = ("", [])
    for i, label_list in enumerate(Y_pred):
        for j, label in enumerate(label_list):
            if label.startswith("B-"):
                if current_one[0] != "" and current_one[0] not in dictNE.keys():
                    dictNE[current_one[0]] = []
                if current_one[0] in dictNE.keys():
                    dictNE[current_one[0]].append(" ".join(current_one[1]).strip().lower())
                ne_type = label[2:]
                current_one = (ne_type, [])
                current_one[1].append(test_data[i][j][0])
            elif label.startswith("I-"):
                ne_type = label[2:]
                current_one[1].append(test_data[i][j][0])

    listKeys = list(dictNE.keys())
    for strKey in listKeys:
        dictNE[strKey] = list(set(dictNE[strKey]))

    # DO NOT CHANGE THE BELOW CODE WHICH WILL SERIALIZE THE ANSWERS FOR THE AUTOMATED TEST HARNESS TO LOAD AND MARK

    # FILTER NE dict by types required for task 3
    listAllowedTypes = ['DATE', 'CARDINAL', 'ORDINAL', 'NORP']
    listKeys = list(dictNE.keys())
    for strKey in listKeys:
        for nIndex in range(len(dictNE[strKey])):
            dictNE[strKey][nIndex] = dictNE[strKey][nIndex].strip().lower()
        if not strKey in listAllowedTypes:
            del dictNE[strKey]

    # write filtered NE dict
    writeHandle = codecs.open('ne.json', 'w', 'utf-8', errors='replace')
    strJSON = json.dumps(dictNE, indent=2)
    writeHandle.write(strJSON + '\n')
    writeHandle.close()


def task2_word2features(sent, i):
    lem = nltk.stem.wordnet.WordNetLemmatizer()
    stem = nltk.stem.porter.PorterStemmer()

    word = sent[i][0]
    postag = sent[i][1]
    postag2 = get_wordnet_pos(postag)
    features = {
        'word': word,
        'postag': postag,

        # token shape
        'word.lower()': word.lower(),
        'word.isupper()': word.isupper(),
        'word.istitle()': word.istitle(),
        'word.isdigit()': word.isdigit(),

        # token suffix
        'word.suffix': word.lower()[-3:],

        # POS prefix
        'postag[:2]': postag[:2],

        #lemma and stem
        'stem': stem.stem(word)
    }
    if postag2:
        features["lemma"] = lem.lemmatize(word, postag2)
    if i > 0:
        word_prev = sent[i - 1][0]
        postag_prev = sent[i - 1][1]
        postag2_prev = get_wordnet_pos(postag_prev)
        features.update({
            '-1:word.lower()': word_prev.lower(),
            '-1:postag': postag_prev,
            '-1:word.isupper()': word_prev.isupper(),
            '-1:word.istitle()': word_prev.istitle(),
            '-1:word.isdigit()': word_prev.isdigit(),
            '-1:word.suffix': word_prev.lower()[-3:],
            '-1:postag[:2]': postag_prev[:2],
            '-1:stem': stem.stem(word_prev)
        })
        if postag2_prev is not None:
            features["-1:lemma"] = lem.lemmatize(word, postag2_prev)
    else:
        features['BOS'] = True

    if i < len(sent) - 1:
        word_next = sent[i + 1][0]
        postag_next = sent[i + 1][1]
        postag2_next = get_wordnet_pos(postag_next)
        features.update({
            '+1:word.lower()': word_next.lower(),
            '+1:postag': postag_next,
            '+1:word.isupper()': word_next.isupper(),
            '+1:word.istitle()': word_next.istitle(),
            '+1:word.isdigit()': word_next.isdigit(),
            '+1:word.suffix': word_next.lower()[-3:],
            '+1:postag[:2]': postag_next[:2],
            '+1:stem': stem.stem(word_next)
        })
        if postag2_next is not None:
            features["+1:lemma"] = lem.lemmatize(word, postag2_next)

    else:
        features['EOS'] = True

    return features


def create_train_list(data, dict_ontonotes):
    # sent = (tokens, pos, IOB_label)
    list_train = []
    for str_file in data:
        for str_sent_index in dict_ontonotes[str_file]:
            # ignore sents with non-PENN POS tags
            if 'XX' in dict_ontonotes[str_file][str_sent_index]['pos']:
                continue
            if 'VERB' in dict_ontonotes[str_file][str_sent_index]['pos']:
                continue


            list_entry = []

            # compute IOB tags for named entities (if any)
            ne_type_last = None
            for nTokenIndex in range(len(dict_ontonotes[str_file][str_sent_index]['tokens'])):
                strToken = dict_ontonotes[str_file][str_sent_index]['tokens'][nTokenIndex]
                strPOS = dict_ontonotes[str_file][str_sent_index]['pos'][nTokenIndex]
                ne_type = None
                if 'ne' in dict_ontonotes[str_file][str_sent_index]:
                    dict_ne = dict_ontonotes[str_file][str_sent_index]['ne']
                    if not 'parse_error' in dict_ne:
                        for str_NEIndex in dict_ne:
                            if nTokenIndex in dict_ne[str_NEIndex]['tokens']:
                                ne_type = dict_ne[str_NEIndex]['type']
                                break
                if ne_type != None and ne_type in ["DATE", "ORDINAL", "CARDINAL", "NORP"]:
                    if ne_type == ne_type_last:
                        strIOB = 'I-' + ne_type
                    else:
                        strIOB = 'B-' + ne_type
                else:
                    strIOB = 'O'
                ne_type_last = ne_type

                list_entry.append((strToken, strPOS, strIOB))

            list_train.append(list_entry)
    return list_train

# from stackoverflow
def get_wordnet_pos(treebank_tag):
    if treebank_tag.startswith('J'):
        return nltk.corpus.wordnet.ADJ
    elif treebank_tag.startswith('V'):
        return nltk.corpus.wordnet.VERB
    elif treebank_tag.startswith('N'):
        return nltk.corpus.wordnet.NOUN
    elif treebank_tag.startswith('R'):
        return nltk.corpus.wordnet.ADV
    else:
        return None

def create_test_list(lines):
    out_sentences = []
    for line in lines:
        text = nltk.word_tokenize(line)
        pos_tags = nltk.pos_tag(text)
        out_sentences.append(pos_tags)
    return out_sentences


if __name__ == '__main__':
    if len(sys.argv) < 4:
        raise Exception('missing command line args : ' + repr(sys.argv))
    ontonotes_file = sys.argv[1]
    book_file = sys.argv[2]
    chapter_file = sys.argv[3]

    logger.info('ontonotes = ' + repr(ontonotes_file))
    logger.info('book = ' + repr(book_file))
    logger.info('chapter = ' + repr(chapter_file))

    # DO NOT CHANGE THE CODE IN THIS FUNCTION

    exec_ner(chapter_file, ontonotes_file)