basic-shell / tests / shell_test_helpers.py
shell_test_helpers.py
Raw
from unittest import TestCase, TextTestResult
import re
import subprocess as proc

TIMEOUT = 30

######################
# Customize unittest #
######################

RED = "\u001b[31m"
GREEN = "\u001b[32m"
YELLOW = "\u001b[33m"
BLUE = "\u001b[34m"
MAGENTA = "\u001b[35m"
WHITE = "\u001b[37m"
RESET = "\u001b[0m"


class ShellTestCase(TestCase):
    def __init__(self, command, *args, **kwargs):
        self.shell_command = command
        super().__init__(*args, **kwargs)

    def run_shell(self, inp):
        rc, output = execute(self.shell_command, input = inp)
        self.assertEqual(rc, 0)
        return filter_shell_output(output)

class PrettierTextTestResult(TextTestResult):
    """A test result class that can print formatted text results to a stream.
    Used by TextTestRunner.
    """
    separator1 = '=' * 70
    separator2 = '-' * 70


    def __init__(self, stream, descriptions, verbosity):
        super(PrettierTextTestResult, self).__init__(stream, descriptions, verbosity)
        self.stream = stream
        self.showAll = True
        self.dots = verbosity == 1
        self.descriptions = descriptions
        self._newline = True

    def getDescription(self, test):
        doc_first_line = test.shortDescription()
        return doc_first_line

    def startTest(self, test):
        super(TextTestResult, self).startTest(test)
        if self.showAll:
            self.stream.write(self.getDescription(test))
            self.stream.write(" ... ")
            self.stream.flush()
            self._newline = False

    def _write_status(self, test, status):
        self.stream.writeln(status)
        self.stream.flush()
        self._newline = True

    def addSubTest(self, test, subtest, err):
        if err is not None:
            if self.showAll:
                if issubclass(err[0], subtest.failureException):
                    self._write_status(subtest, f"{RED}FAIL{RESET}")
                else:
                    self._write_status(subtest, f"{RED}ERROR{RESET}")
            elif self.dots:
                if issubclass(err[0], subtest.failureException):
                    self.stream.write('F')
                else:
                    self.stream.write('E')
                self.stream.flush()
        super(TextTestResult, self).addSubTest(test, subtest, err)

    def addSuccess(self, test):
        super(TextTestResult, self).addSuccess(test)
        if self.showAll:
            self._write_status(test, f"{GREEN}ok{RESET}")
        elif self.dots:
            self.stream.write('.')
            self.stream.flush()

    def addError(self, test, err):
        super(TextTestResult, self).addError(test, err)
        if self.showAll:
            self._write_status(test, f"{RED}ERROR{RESET}")
        elif self.dots:
            self.stream.write('E')
            self.stream.flush()

    def addFailure(self, test, err):
        super(TextTestResult, self).addFailure(test, err)
        if self.showAll:
            self._write_status(test, f"{RED}FAIL{RESET}")
        elif self.dots:
            self.stream.write('F')
            self.stream.flush()

    def addSkip(self, test, reason):
        super(TextTestResult, self).addSkip(test, reason)
        if self.showAll:
            self._write_status(test, "skipped {0!r}".format(reason))
        elif self.dots:
            self.stream.write("s")
            self.stream.flush()

    def addExpectedFailure(self, test, err):
        super(TextTestResult, self).addExpectedFailure(test, err)
        if self.showAll:
            self.stream.writeln("expected failure")
            self.stream.flush()
        elif self.dots:
            self.stream.write("x")
            self.stream.flush()

    def addUnexpectedSuccess(self, test):
        super(TextTestResult, self).addUnexpectedSuccess(test)
        if self.showAll:
            self.stream.writeln("unexpected success")
            self.stream.flush()
        elif self.dots:
            self.stream.write("u")
            self.stream.flush()

    def printErrors(self):
        if self.dots or self.showAll:
            self.stream.writeln()
            self.stream.flush()
        self.printErrorList('ERROR', self.errors)
        self.printErrorList('FAIL', self.failures)
        unexpectedSuccesses = getattr(self, 'unexpectedSuccesses', ())
        if unexpectedSuccesses:
            self.stream.writeln(self.separator1)
            for test in unexpectedSuccesses:
                self.stream.writeln(f"UNEXPECTED SUCCESS: {self.getDescription(test)}")
            self.stream.flush()

    def printErrorList(self, flavour, errors):
        for test, err in errors:
            self.stream.writeln(self.separator1)
            self.stream.writeln(f"%s: {MAGENTA}%s{RESET}" % (flavour,self.getDescription(test)))
            self.stream.writeln(self.separator2)
            self.stream.writeln("%s" % err)
            self.stream.flush()


####################
# Our test helpers #
####################

def sh(command):
    outb = proc.run(command, shell = True, capture_output = True).stdout
    return try_decode(outb).strip()

def execute(*args, input = None):
    if input != None:
        stdin = proc.PIPE
        input = input.encode('ASCII')
    else:
        stdin = None

    try:
        exe = proc.Popen(args, 
                         stdin = stdin, 
                         stdout = proc.PIPE, 
                         stderr = proc.STDOUT)
    except OSError as exc:
        raise RuntimeError(f"Execution Error: {exc}") from exc

    try:
        outb = exe.communicate(input = input, timeout = TIMEOUT)[0]
        out = try_decode(outb).strip()
        ret = exe.returncode
    except proc.TimeoutExpired as exc:
        try:
            exe.terminate()        # Kill Nicely
            exe.wait(timeout=.500) # Wait 500ms for exit
        except proc.TimeoutExpired:
            exe.kill()             # Process didn't exit; really kill
            exe.poll()             # Reap Defunct Process
        raise RuntimeError(f"It seems something went wrong and the program didn't finish within {TIMEOUT}s") from exc
    else:
        return (ret, out)

# inspired by https://stackoverflow.com/a/15918519
def try_decode(bytes, codecs=['ascii', 'utf8', 'latin-1']):
    exc = None
    for codec in codecs:
        try:
            return bytes.decode(codec)
        except UnicodeDecodeError as e:
            exc = e

    if exc != None:
        raise exc

def filter_shell_output(output):
    lines = output.splitlines()

    def filter_line(line):
        return re.sub(r'[Bb]ye [Bb]ye[.!]? *', '', 
                      re.sub(r'shell ?\$ *', '', 
                             re.sub(r'Welcome to mini-shell[.!]? *', '', 
                                    line)))

    def is_empty(line): return line.strip() != ''

    # Filter out parts of the shell output and remove any empty lines
    filtered_lines = filter(is_empty, map(filter_line, lines))

    return "\n".join(filtered_lines)