#####
#
# Collection of handy routines for testing shells
#
# DO NOT change ANY of the code. It will break the relation between
# the tests run by you and us running your tests for grading.
#
#
# Include this file with "import proc_check"
#
#####
import sys
import os, re, pexpect, time
def check_pid_fgpgrp(pid):
"""
Read the /proc/<pid>/stat file and check if the process group of the pid
is the foreground process group. If it is the fg pgrp, return true.
If it is not the fg pgrp, return false.
If there are errors opening the file, there will be an assertion error.
"""
f = open('/proc/' + pid + '/stat', 'r')
stat_str = f.readline()
grps = re.match("\d+ \S+ . \d+ (\d+) \d+ \d+ (\d+)", stat_str)
grps_1 = grps.group(1)
grps_2 = grps.group(2)
return grps_1 == grps_2
f.close()
def check_pid_status(pid, pid_status):
"""
Read the /proc/<pid>/stat file and check the one-character status of the
process. Compare that status to the provided pid_status. Return true
if the values are the same, return false if the process status and the
pid_status differ.
If there are errors opening the file, there will be an assertion error.
"""
f = open('/proc/' + pid + '/stat', 'r')
stat_str = f.readline()
grps = re.match("\d+ \S+ (.) \d+ \d+ \d+ \d+ \d+", stat_str)
grps_2 = grps.group(1)
return grps_2 == pid_status
f.close()
def count_active_children(pexpect_mod, num_children_expected):
"""
Count the number of process spawned by the provided shell that are
currently active. Error is asserted and the test is failed if
the number of children expected is not at least 0, and the number of
children discovered is not greater than or less than the number
expected.
"""
assert num_children_expected >= 0, 'ERROR: invalid input'
num_children_found = 0
for pid in os.listdir("/proc/"):
try:
pid = int(pid)
try:
f = open('/proc/' + str(pid) + '/stat', 'r')
stat_str = f.readline()
#pulling the 4th number, the ppid, from the /proc/pid/stat file
grps = re.match("\d+ \S+ . (\d+)", stat_str)
ppid = int(grps.group(1))
if ppid == pexpect_mod.pid:
num_children_found = num_children_found + 1
except:
#error from failing to open the file
#i.e. the process finished before being read
pass
except:
#error from converting the filename to an integer
#i.e. it wasn't a numbered process file
pass
assert num_children_found >= num_children_expected, \
'ERROR: Fewer children found in process than expected'
assert num_children_found == num_children_expected, \
'ERROR: More children found in process than expected'
def count_children_timeout(pexpect_mod, num_children_expected, timeout = -1):
"""
Count the number of process spawned by the provided shell,
until the timeout period ends. Error is asserted and the test is failed if
there is no timeout provided, the number of children expected is not at
least 0, and the number of children discovered is not greater than or less
than the number expected.
"""
assert timeout is not None, \
'Error: Must include timeout or allow default timeout'
assert num_children_expected >= 0, \
'ERROR: input for num_children_expected invalid'
#The default timeout is the timeout of the pexpect module
if timeout == -1:
timeout = pexpect_mod.timeout
end_time = time.time() + timeout
num_children_found = 0
children_found = []
#timeout loop, always runs for timeout seconds
while timeout > 0:
#minor sleep, to lessen busy-waiting processing
time.sleep(0.010)
for pid in os.listdir("/proc/"):
try:
pid = int(pid)
try:
f = open('/proc/' + str(pid) + '/stat', 'r')
stat_str = f.readline()
#pulling the first and fourth numbers, the pid and ppid
grps = re.match("(\d+) \S+ . (\d+)", stat_str)
pid = int(grps.group(1))
ppid = int(grps.group(2))
if ppid == pexpect_mod.pid and children_found.count(pid) == 0:
num_children_found = num_children_found + 1
children_found.append(pid)
except:
#error from failing to open the file
#i.e. the process finished before being read
pass
except:
#error from converting the filename to an integer
#i.e. it wasn't a numbered process file
pass
timeout = end_time - time.time()
assert num_children_found >= num_children_expected, \
'ERROR: Fewer children found in process than expected'
assert num_children_found == num_children_expected, \
'ERROR: More children found in process than expected'
return children_found
def wait_until_child_is_in_foreground(pexpect_mod, timeout = -1):
"""
This begins a timeout loop that waits for a process id other than that
of the shell (self.pid) to take over the pty. A timeout is raised if the
terminal is not taken over, and the while loop quits if the terminal is
taken over. There is a 10ms sleep to slow the pace of the busy-waiting.
There is a chance that a program could start and finish in those said 10ms,
and as such tests should be designed to not wait for extremely fast
processing commands.
"""
#the default timeout is the timeout from the pexpect module
if timeout == -1:
timeout = pexpect_mod.timeout
if timeout is not None:
end_time = time.time() + pexpect_mod.timeout
#busy wait with a sleep until the pty is taken over by a process
while os.tcgetpgrp(pexpect_mod.child_fd) == pexpect_mod.pid:
time.sleep(0.010)
if timeout is not None:
timeout = end_time - time.time()
if timeout < 0 and timeout is not None:
raise pexpect.TIMEOUT ('Timeout exceeded in \
wait_until_child_is_in_foreground().')
process_pid = os.tcgetpgrp(pexpect_mod.child_fd)
assert check_foreground_process(pexpect_mod.pid, process_pid), \
'Error, terminal not taken over by child process properly'
def check_foreground_process(parent_pid, process_pgrp):
"""
Check the foreground process identified by
wait_until_child_is_in_foreground that it:
1. Owns the tty (implicit from how the process is identified)
2. Is a child of the shell.
"""
if os.path.exists('/proc/' + str(process_pgrp) + '/stat'):
try:
# as per http://man7.org/linux/man-pages/man5/proc.5.html
# pid (exename) state ppid pgrp session tty_nr tpgid
f = open('/proc/' + str(process_pgrp) + '/stat', 'r')
stat_str = f.readline()
grps = re.match("\d+ \S+ . (\d+) (\d+) \d+ \d+ (\d+)", stat_str)
ppid = grps.group(1) # ppid
pgrp = grps.group(2) # pgrp
tpgid = grps.group(3) # tpgid
if pgrp != tpgid:
print "pid %d (ppid %s, pgrp %s) does not own tty. It's owned by %s" % (process_pgrp, ppid, pgrp, tpgid)
assert pgrp == tpgid, 'Error, the process does not own the tty'
assert int(ppid) == parent_pid, \
'Error, process is not a child of the shell'
f.close()
return True
except IOError, e:
print "Error opening the proc/<pid>/stat file for the process. \
Advise: attempt re-run."
return False
else:
print "Error: The proc/<pid>/stat file for the process does not exist.\
Run a command that won't have the process group leader quit early."
return False