##### # # Collection of handy routines for testing shells # # Include this file with "import shelltests" # ##### import sys, imp, atexit, errno import os, re, time, pexpect, tempfile, proc_check, shutil, stat, signal, traceback from collections import namedtuple def test_success(msg = "", exit = True): """Print PASS message and optionally 'msg'. Exit unless 'exit is False""" print "PASS", msg if exit: sys.exit(0) def parse_regular_expression(pexpect_child, regex): """ A regular expression was printed and is matched into groups and returned. The return value will be a tuple of all of the parenthetical groups found in the regular expression. For example: regex = \[(\d+)\] (\d+) will return a tuple of the form (decimal_one, decimal_two) as it was read from the output of the shell "[decimal_one] decimal_two". """ pexpect_child.expect(regex) return __regex2tuple(pexpect_child.match) def __regex2tuple(match): """Turn a matched regular expression into a tuple of its captured groups""" return tuple([ match.group(i) for i in range(1, match.lastindex + 1) ]) def handle_exception(type, exc, tb): """Install a default exception handler. If there is a pexpect.TIMEOUT exception thrown at any time in the script, report that the test failed because of a timeout and exit. """ if type == pexpect.TIMEOUT: print "\n>>> FAIL: Test timed out", exc.get_trace(), "\n" else: print "\n>>> FAIL: ", type, "'", exc, "'\n" traceback.print_tb(tb) sys.excepthook = handle_exception console = None settings_module = None def setup_tests(additional_cmdline_arguments = []): global console global settings_module definitions_scriptname = sys.argv[1] settings_module = imp.load_source('', definitions_scriptname) logfile = None if hasattr(settings_module, 'logfile'): logfile = settings_module.logfile #spawn an instance of the shell console = pexpect.spawn(settings_module.shell + " ".join(map(str, additional_cmdline_arguments)), drainpty=True, logfile=logfile or sys.stdout) atexit.register(kill, shell_process=console) # set timeout for all following 'expect*' calls to 2 seconds console.timeout = 2 return console def sendline(line): console.sendline(line) def sendcontrol(ch): console.sendcontrol(ch) def sendintr(): console.sendintr() def expect(line, message=None): if message is None: message = 'expected "{}" but did not find' assert console.expect(line) == 0, message def expect_exact(line, message=None): if message is None: message = 'expected "{}" but did not find' assert console.expect_exact(line) == 0, message def expect_prompt(message=None): if message is None: message = 'shell did not return to prompt' assert console.expect(settings_module.prompt) == 0, message def expect_regex(regex): return parse_regular_expression(console, regex) def kill(shell_process): console.close(force=True) def wait_for_fg_child(): proc_check.wait_until_child_is_in_foreground(console) def parse_job_line(): job_status = namedtuple('job_status', ['id', 'status', 'command']) jid, status, cmd = expect_regex(settings_module.job_status_regex) for name, val in settings_module.jobs_status_msg.items(): if val == status: status = name break return job_status(jid, status, cmd) def parse_bg_status(): bg_status = namedtuple('bg_status', ['job_id', 'pid']) jid, pid = expect_regex(settings_module.bgjob_regex) return bg_status(jid, pid) def run_builtin(command, *args): command = settings_module.builtin_commands.get(command, command) sendline(command % tuple(args)) def assert_correct_fds(pid, message): '''Checks that file descriptors are not leaked into the child. ''' time.sleep(0.5) fds = sorted(os.listdir('/proc/{0}/fd'.format(pid))) if not (fds == list('012')): raise Exception('File descriptors leaked into child! Remember to close() all of the pipes and IO redir file descriptors') def get_shell_pid(): return console.pid def make_test_program(src, cflags="-O2"): exefile, exefilename = tempfile.mkstemp() os.close(exefile) ofile, ofilename = tempfile.mkstemp(suffix=".c") os.write(ofile, src) os.close(ofile) os.system("gcc %s %s -o %s" % (cflags, ofilename, exefilename)) os.unlink(ofilename) return exefilename def removefile(filename): """ Remove this file if it exists """ try: os.unlink(filename) except OSError as e: if e.errno != errno.ENOENT: raise