customBashShell / pexpect-dpty / proc_check.py
proc_check.py
Raw
#####
#
# Collection of handy routines for testing shells
#
# DO NOT change ANY of the code.  It will break the relation between
# the tests run by you and us running your tests for grading.
# 
#
# Include this file with "import proc_check"
#
#####
import sys
import os, re, pexpect, time

def check_pid_fgpgrp(pid):
    """
    Read the /proc/<pid>/stat file and check if the process group of the pid
    is the foreground process group.  If it is the fg pgrp, return true.
    If it is not the fg pgrp, return false.  
    If there are errors opening the file, there will be an assertion error.
    """

    f = open('/proc/' + pid + '/stat', 'r')
    stat_str = f.readline()
    
    grps = re.match("\d+ \S+ . \d+ (\d+) \d+ \d+ (\d+)", stat_str)
    grps_1 = grps.group(1)
    grps_2 = grps.group(2)
    
    return grps_1 == grps_2
    f.close()

def check_pid_status(pid, pid_status):
    """
    Read the /proc/<pid>/stat file and check the one-character status of the
    process.  Compare that status to the provided pid_status.  Return true
    if the values are the same, return false if the process status and the
    pid_status differ.
    If there are errors opening the file, there will be an assertion error.
    """

    f = open('/proc/' + pid + '/stat', 'r')
    stat_str = f.readline()
    grps = re.match("\d+ \S+ (.) \d+ \d+ \d+ \d+ \d+", stat_str)
    grps_2 = grps.group(1)
    
    return grps_2 == pid_status
    f.close()
    

def count_active_children(pexpect_mod, num_children_expected):
	
	"""
	Count the number of process spawned by the provided shell that are
	currently active.  Error is asserted and the test is failed if
	the number of children expected is not at least 0, and the number of 
	children discovered is not greater than or less than the number 
	expected.
	"""
	
	assert num_children_expected >= 0, 'ERROR: invalid input'
	
	num_children_found = 0
	for pid in os.listdir("/proc/"):
		try:
			pid = int(pid)
			try:
				f = open('/proc/' + str(pid) + '/stat', 'r')
				stat_str = f.readline()
				
				#pulling the 4th number, the ppid, from the /proc/pid/stat file
				grps = re.match("\d+ \S+ . (\d+)", stat_str)
				ppid = int(grps.group(1))
				
				if ppid == pexpect_mod.pid:
					num_children_found = num_children_found + 1
			except:
					#error from failing to open the file
					#i.e. the process finished before being read
					pass
		except:
			#error from converting the filename to an integer
			#i.e. it wasn't a numbered process file
			pass

	assert num_children_found >= num_children_expected, \
			'ERROR:  Fewer children found in process than expected'
	assert num_children_found == num_children_expected, \
			'ERROR:  More children found in process than expected'

def count_children_timeout(pexpect_mod, num_children_expected, timeout = -1):
	
	"""
	Count the number of process spawned by the provided shell,
	until the timeout period ends.  Error is asserted and the test is failed if
	there is no timeout provided, the number of children expected is not at 
	least 0, and the number of children discovered is not greater than or less
	than the number expected.
	"""
	
	assert timeout is not None, \
		'Error:  Must include timeout or allow default timeout'
	assert num_children_expected >= 0, \
		'ERROR: input for num_children_expected invalid'
	
	#The default timeout is the timeout of the pexpect module
	if timeout == -1:
		timeout = pexpect_mod.timeout
	end_time = time.time() + timeout
	
	num_children_found = 0
	children_found = []
	
	#timeout loop, always runs for timeout seconds
	while timeout > 0:
		#minor sleep, to lessen busy-waiting processing
		time.sleep(0.010)
		for pid in os.listdir("/proc/"):
			try:
				pid = int(pid)
				try:
					f = open('/proc/' + str(pid) + '/stat', 'r')
					stat_str = f.readline()
					
					#pulling the first and fourth numbers, the pid and ppid
					grps = re.match("(\d+) \S+ . (\d+)", stat_str)
					pid = int(grps.group(1))
					ppid = int(grps.group(2))
					
					if ppid == pexpect_mod.pid and children_found.count(pid) == 0:
						num_children_found = num_children_found + 1
						children_found.append(pid)
				except:
					#error from failing to open the file
					#i.e. the process finished before being read
					pass
			except:
				#error from converting the filename to an integer
				#i.e. it wasn't a numbered process file
				pass
		
		timeout = end_time - time.time()

	assert num_children_found >= num_children_expected, \
			'ERROR:  Fewer children found in process than expected'
	assert num_children_found == num_children_expected, \
			'ERROR:  More children found in process than expected'
        return children_found

def wait_until_child_is_in_foreground(pexpect_mod, timeout = -1):

    """
    This begins a timeout loop that waits for a process id other than that
    of the shell (self.pid) to take over the pty.  A timeout is raised if the
    terminal is not taken over, and the while loop quits if the terminal is
    taken over.  There is a 10ms sleep to slow the pace of the busy-waiting.
    There is a chance that a program could start and finish in those said 10ms,
    and as such tests should be designed to not wait for extremely fast 
    processing commands.
    """
    
    #the default timeout is the timeout from the pexpect module
    if timeout == -1:
	    timeout = pexpect_mod.timeout
    if timeout is not None:
        end_time = time.time() + pexpect_mod.timeout 

	#busy wait with a sleep until the pty is taken over by a process
	while os.tcgetpgrp(pexpect_mod.child_fd) == pexpect_mod.pid:
		time.sleep(0.010)            
		if timeout is not None:
			timeout = end_time - time.time()
                    
		if timeout < 0 and timeout is not None:
			raise pexpect.TIMEOUT ('Timeout exceeded in \
									wait_until_child_is_in_foreground().')

	process_pid = os.tcgetpgrp(pexpect_mod.child_fd)
	assert check_foreground_process(pexpect_mod.pid, process_pid), \
			'Error, terminal not taken over by child process properly'

def check_foreground_process(parent_pid, process_pgrp):

	"""
	Check the foreground process identified by 
		wait_until_child_is_in_foreground that it:
	1.  Owns the tty (implicit from how the process is identified)
	2.  Is a child of the shell.
	"""

	if os.path.exists('/proc/' + str(process_pgrp) + '/stat'):
		try:
			# as per http://man7.org/linux/man-pages/man5/proc.5.html
			# pid (exename) state ppid pgrp session tty_nr tpgid
			f = open('/proc/' + str(process_pgrp) + '/stat', 'r')
			stat_str = f.readline()
			grps = re.match("\d+ \S+ . (\d+) (\d+) \d+ \d+ (\d+)", stat_str)
			ppid = grps.group(1)	# ppid
			pgrp = grps.group(2)	# pgrp
			tpgid = grps.group(3)	# tpgid

			if pgrp != tpgid:
				print "pid %d (ppid %s, pgrp %s) does not own tty. It's owned by %s" % (process_pgrp, ppid, pgrp, tpgid)

			assert pgrp == tpgid, 'Error, the process does not own the tty'

			assert int(ppid) == parent_pid, \
					'Error, process is not a child of the shell'

			f.close()
			return True
		except IOError, e:
			print "Error opening the proc/<pid>/stat file for the process.  \
					Advise: attempt re-run."
			return False
	
	else:
		print "Error: The proc/<pid>/stat file for the process does not exist.\
		  Run a command that won't have the process group leader quit early."
		return False