import sys, re from subprocess import \ check_output, PIPE, STDOUT, DEVNULL, CalledProcessError, TimeoutExpired from os.path import abspath, basename, dirname, exists, join, splitext, isdir from getopt import getopt, GetoptError from os import chdir, environ, getcwd, mkdir, remove, access, W_OK from shutil import copyfile, rmtree from math import log from glob import glob SHORT_USAGE = """\ Usage: python3 runner.py OPTIONS TEST.in ... OPTIONS may include --keep Keep test directories --lib=DIR Relative path to directory containing CS61BL libraries --timeout=SEC Default number of seconds allowed to each execution of gitlet. --src=SRC Use SRC instead of "src" as the subdirectory containing files referenced by + and =. --tolerance=N Set the maximum allowed edit distance between program output and expected output to N (default 3). --verbose Print extra information about execution. """ USAGE = SHORT_USAGE + """\ For each TEST.in, change to an empty directory, and execute the instructions in TEST.in. Before executing an instruction, first replace any occurrence of ${VAR} with the current definition of VAR (see the D command below). Replace any occurrence of ${N} for non-negative decimal numeral N with the value of the Nth captured group in the last ">" command's expected output lines. Undefined if the last ">" command did not end in "<<<*", or did not have the indicated group. N=0 indicates the entire matched string. The instructions each have one of the following forms: # ... A comment, producing no effect. I FILE Include. Replace this statement with the contents of FILE, interpreted relative to the directory containing the .in file. C DIR Create, if necessary, and switch to a subdirectory named DIR under the main directory for this test. If DIR is missing, changes back to the default directory. This command is principally intended to let you set up remote repositories. T N Set the timeout for gitlet commands in the rest of this test to N seconds. + NAME F Copy the contents of src/F into a file named NAME. - NAME Delete the file named NAME. > COMMAND OPERANDS LINE1 LINE2 ... <<< Run gitlet.Main with COMMAND ARGUMENTS as its parameters. Compare its output with LINE1, LINE2, etc., reporting an error if there is "sufficient" discrepency. The <<< delimiter may be followed by an asterisk (*), in which case, the preceding lines are treated as Python regular expressions and matched accordingly. The directory or JAR file containing the gitlet.Main program is assumed to be in directory DIR specifed by --progdir (default is ..). = NAME F Check that the file named NAME is identical to src/F, and report an error if not. * NAME Check that the file NAME does not exist, and report an error if it does. E NAME Check that file or directory NAME exists, and report an error if it does not. D VAR "VALUE" Defines the variable VAR to have the literal value VALUE. VALUE is taken to be a raw Python string (as in r"VALUE"). Substitutions are first applied to VALUE. For each TEST.in, reports at most one error. Without the --show option, simply indicates tests passed and failed. If N is postive, also prints details of the first N failing tests. With --show=all, shows details of all failing tests. With --keep, keeps the directories created for the tests (with names TEST.dir). When finished, reports number of tests passed and failed, and the number of faulty TEST.in files.""" DIRECTORY_LAYOUT_ERROR = """\ Your {} folder is not where we expected it. Please ensure that your directory structure matches the following: sp21-s*** ├── library-sp21 │ └── ... ├── proj2 │ ├── gitlet │ ├── testing <==== This should be your CWD │ │ ├── runner.py │ │ └── ... │ └── ... └── ... Note your CWD must be `repo/proj2/testing`""" JAVA_COMMAND = "java" CAPERS_COMMAND = "gitlet.Main" JAVAC_COMMAND = "javac -d ." JVM_COMMAND = "-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=*:5005" TIMEOUT = 10 DEBUG = False DEBUG_MSG = \ """ ============================================================================ | ~~~~~ You are in debug mode ~~~~~ | | In this mode, you will be shown each command from the test case. | | | | There are three commands: | | | | 1. 'n' - type in 'n' to go to the next command without debugging the | | current one (analogous to "Step Over" in IntelliJ). | | | | 2. 's' - type in 's' to debug the current command (analogous to | | "Step Into" in IntelliJ). Make sure to set breakpoints! | | | | 3. 'q' - type in 'q' to quit and stop debugging. If you had the `--keep` | | flag, then your directory state will be saved and you can | | investigate it. | ============================================================================ """ def Usage(): print(SHORT_USAGE, file=sys.stderr) sys.exit(1) Mat = None def Match(patn, s): global Mat Mat = re.match(patn, s) return Mat def Group(n): return Mat.group(n) def contents(filename): try: with open(filename) as inp: return inp.read() except FileNotFoundError: return None def editDistance(s1, s2): dist = [list(range(len(s2) + 1))] + \ [ [i] + [ 0 ] * len(s2) for i in range(1, len(s1) + 1) ] for i in range(1, len(s1) + 1): for j in range(1, len(s2) + 1): dist[i][j] = min(dist[i-1][j] + 1, dist[i][j-1] + 1, dist[i-1][j-1] + (s1[i-1] != s2[j-1])) return dist[len(s1)][len(s2)] def nextCommand(full_cmnd, timeout): return check_output(full_cmnd, shell=True, universal_newlines=True, stdin=DEVNULL, stderr=STDOUT, timeout=timeout) def stepIntoCommand(full_cmnd): out = check_output(full_cmnd, shell=True, universal_newlines=True, stdin=DEVNULL, stderr=STDOUT, timeout=None) return out.split("\n", 1)[1] def createTempDir(base): for n in range(100): name = "{}_{}".format(base, n) try: mkdir(name) return name except OSError: pass else: raise ValueError("could not create temp directory for {}".format(base)) def cleanTempDir(dir): rmtree(dir, ignore_errors=True) def doDelete(name, dir): try: remove(join(dir, name)) except OSError: pass def doCopy(dest, src, dir): try: doDelete(dest, dir) copyfile(join(src_dir, src), join(dir, dest)) except OSError: raise ValueError("file {} could not be copied to {}".format(src, dest)) def doCompile(target): out = "" try: full_cmnd = "{} {}".format(JAVAC_COMMAND, target) out = check_output(full_cmnd, shell=True, universal_newlines=True, stdin=DEVNULL, stderr=STDOUT) return "OK", out except CalledProcessError as excp: return ("javac exited with code {}".format(excp.args[0]), excp.output) def doExecute(cmnd, dir, timeout): here = getcwd() out = "" try: chdir(dir) full_cmnd = "{} {} {}".format(JAVA_COMMAND, CAPERS_COMMAND, cmnd) if DEBUG: print(">>> gitlet {}".format(cmnd)) next_cmd = input("> ").strip().lower() while(next_cmd not in {'s', 'n', 'q'}): print("Please enter either 'n' or 's'.") next_cmd = input("> ").strip().lower() if next_cmd == "n": out = nextCommand(full_cmnd, timeout) elif next_cmd == "s": full_cmnd = "{} {} {} {}".format(JAVA_COMMAND, JVM_COMMAND, CAPERS_COMMAND, cmnd) print(f"Ready to debug the command `gitlet {cmnd}`") print("Open IntelliJ and hit the \"Debug\" button. Don't forget to set a breakpoint!") out = stepIntoCommand(full_cmnd) elif next_cmd == "q": return "User Exit", None else: out = nextCommand(full_cmnd, timeout) if superverbose: print(out) return "OK", out except CalledProcessError as excp: return ("java gitlet.Main exited with code {}".format(excp.args[0]), excp.output) except TimeoutExpired: return "timeout", None finally: chdir(here) def canonicalize(s): if s is None: return None return re.sub('\r', '', s) def fileExists(f, dir): return exists(join(dir, f)) def correctFileOutput(name, expected, dir): userData = canonicalize(contents(join(dir, name))) stdData = canonicalize(contents(join(src_dir, expected))) return userData == stdData def correctProgramOutput(expected, actual, last_groups, is_regexp): expected = re.sub(r'[ \t]+\n', '\n', '\n'.join(expected)) expected = re.sub(r'(?m)^[ \t]+', ' ', expected) actual = re.sub(r'[ \t]+\n', '\n', actual) actual = re.sub(r'(?m)^[ \t]+', ' ', actual) last_groups[:] = (actual,) if is_regexp: try: if not Match(expected.rstrip() + r"\Z", actual) \ and not Match(expected.rstrip() + r"\Z", actual.rstrip()): return False except: raise ValueError("bad pattern") last_groups[:] += Mat.groups() elif editDistance(expected.rstrip(), actual.rstrip()) > output_tolerance: return False return True def reportDetails(test, included_files, line_num): if show is None: return if show <= 0: print(" Limit on error details exceeded.") return direct = dirname(test) print(" Error on line {} of {}".format(line_num, basename(test))) for base in [basename(test)] + included_files: full = join(dirname(test), base) print(("-" * 20 + " {} " + "-" * 20).format(base)) text_lines = list(enumerate(re.split(r'\n\r?', contents(full))))[:-1] fmt = "{{:{}d}}. {{}}".format(round(log(len(text_lines), 10))) text = '\n'.join(map(lambda p: fmt.format(p[0] + 1, p[1]), text_lines)) print(text) print("-" * (42 + len(base))) def chop_nl(s): if s and s[-1] == '\n': return s[:-1] else: return s def line_reader(f, prefix): n = 0 try: with open(f) as inp: while True: L = inp.readline() if L == '': return n += 1 included_file = yield (prefix + str(n), L) if included_file: yield None yield from line_reader(included_file, prefix + str(n) + ".") except FileNotFoundError: raise ValueError("file {} not found".format(f)) def doTest(test): last_groups = [] base = splitext(basename(test))[0] print("{}:".format(base), end=" \n") cdir = tmpdir = createTempDir(base) if verbose: print("Testing directory: {}".format(tmpdir)) timeout = TIMEOUT defns = {} def do_substs(L): c = 0 L0 = None while L0 != L and c < 10: c += 1 L0 = L L = re.sub(r'\$\{(.*?)\}', subst_var, L) return L def subst_var(M): key = M.group(1) if Match(r'\d+$', key): try: return last_groups[int(key)] except IndexError: raise ValueError("FAILED (nonexistent group: {{{}}})" .format(key)) elif M.group(1) in defns: return defns[M.group(1)] else: raise ValueError("undefined substitution: ${{{}}}".format(M.group(1))) try: line_num = None inp = line_reader(test, '') included_files = [] while True: line_num, line = next(inp, (line_num, '')) if line == "": print("OK") return True if not Match(r'\s*#', line): line = do_substs(line) if verbose: print("+ {}".format(line.rstrip())) if Match(r'\s*#', line) or Match(r'\s+$', line): pass elif Match(r'I\s+(\S+)', line): inp.send(join(dirname(test), Group(1))) included_files.append(Group(1)) elif Match(r'C\s*(\S*)', line): if Group(1) == "": cdir = tmpdir else: cdir = join(tmpdir, Group(1)) if not exists(cdir): mkdir(cdir) elif Match(r'T\s*(\S+)', line): try: timeout = float(Group(1)) except: ValueError("bad time: {}".format(line)) elif Match(r'\+\s*(\S+)\s+(\S+)', line): doCopy(Group(1), Group(2), cdir) elif Match(r'-\s*(\S+)', line): doDelete(Group(1), cdir) elif Match(r'>\s*(.*)', line): cmnd = Group(1) expected = [] while True: line_num, L = next(inp, (line_num, '')) if L == '': raise ValueError("unterminated command: {}" .format(line)) L = L.rstrip() if Match(r'<<<(\*?)', L): is_regexp = Group(1) break expected.append(do_substs(L)) msg, out = doExecute(cmnd, cdir, timeout) if verbose: if out: print(re.sub(r'(?m)^', '- ', chop_nl(out))) if msg == "OK": if not correctProgramOutput(expected, out, last_groups, is_regexp): msg = "incorrect output" elif msg == "User Exit": print("Exiting Debug mode ...") break if msg != 'OK': print("ERROR ({})".format(msg)) reportDetails(test, included_files, line_num) return False elif Match(r'=\s*(\S+)\s+(\S+)', line): if not correctFileOutput(Group(1), Group(2), cdir): print("ERROR (file {} has incorrect content)" .format(Group(1))) reportDetails(test, included_files, line_num) return False elif Match(r'\*\s*(\S+)', line): if fileExists(Group(1), cdir): print("ERROR (file {} present)".format(Group(1))) reportDetails(test, included_files, line_num) return False elif Match(r'E\s*(\S+)', line): if not fileExists(Group(1), cdir): print("ERROR (file or directory {} not present)" .format(Group(1))) reportDetails(test, included_files, line_num) return False elif Match(r'(?s)D\s*([a-zA-Z_][a-zA-Z_0-9]*)\s*"(.*)"\s*$', line): defns[Group(1)] = Group(2) else: raise ValueError("bad test line at {}".format(line_num)) finally: if not keep: cleanTempDir(tmpdir) else: print(f"\nDirectory state saved in {tmpdir}") if __name__ == "__main__": show = None keep = False prog_dir = None lib_dir = None verbose = False superverbose = False src_dir = 'src' gitlet_dir = join(dirname(abspath(getcwd())), "gitlet") output_tolerance = 0 try: opts, files = \ getopt(sys.argv[1:], '', ['show=', 'keep', 'lib=', 'verbose', 'src=', 'tolerance=', 'superverbose', 'debug']) for opt, val in opts: if opt == '--show': show = int(val) elif opt == "--keep": keep = True elif opt == "--lib": lib_dir = val elif opt == "--src": src_dir = val elif opt == "--verbose": verbose = True elif opt == "--tolerance": output_tolerance = int(val) elif opt == "--superverbose": superverbose = True elif opt == "--debug": DEBUG = True TIMEOUT = 100000 if prog_dir is None: prog_dir = abspath(getcwd()) k = 10 while k > 0 and access(prog_dir, W_OK): k -= 1 if exists(join(prog_dir, 'gitlet', 'Main.class')): break prog_dir = dirname(prog_dir) else: print("Could not find gitlet.Main.", file=sys.stderr) sys.exit(1) except GetoptError: Usage() if not files: print(USAGE) sys.exit(0) ON_WINDOWS = Match(r'.*\\', join('a', 'b')) if ON_WINDOWS: environ['CLASSPATH'] = "{};{}".format(prog_dir, environ['CLASSPATH']) else: environ['CLASSPATH'] = "{}:{}".format(prog_dir, environ['CLASSPATH']) JAVA_COMMAND = 'exec ' + JAVA_COMMAND matching_files = [] for path in files: matching_files += glob(path) files = matching_files num_tests = len(files) errs = 0 fails = 0 print(DEBUG_MSG) for test in files: try: if not exists(test): num_tests -= 1 elif not doTest(test): errs += 1 if type(show) is int: show -= 1 except ValueError as excp: print("FAILED ({})".format(excp.args[0])) fails += 1 cleanTempDir(join(abspath(getcwd()), "gitlet")) print() print("Ran {} tests. ".format(num_tests), end="") if errs == fails == 0: print("All passed.") else: print("{} passed.".format(num_tests - errs - fails)) sys.exit(1)