from testutils import * from tempfile import mkstemp setup_tests() expect_prompt() _, tmpfile = mkstemp() with open(tmpfile, 'w') as fd: fd.write('123 456 789 10\n') yes = None tmpout = None try: yes = make_test_program(open(os.path.dirname(__file__) + "/yes.c").read()) message = '''Setup a multiple pipeline from an input file''' sendline('sed s/123/hello/ < {0} | sed s/456/world/ | rev'.format(tmpfile)) expect('01 987 dlrow olleh', message) expect_prompt() removefile(tmpfile) # the following command should (a) complete and (b) output 333 # this test tends to fail if the parent retains pipe file descriptors # because 'wc' then doesn't exit when head exits and closes the pipe sendline('{0} test | head -n 333 | wc -l'.format(yes)) expect("333") expect_prompt() # now repeat this combined with I/O redirection ending the pipeline tmpfile = '/tmp/{0}.{1}.txt'.format(int(time.time() * 1000), os.getuid()) sendline('{1} test | head -n 3 | wc -l > {0}'.format(tmpfile, yes)) expect_prompt() with open(tmpfile) as fd: assert '3' == fd.read().strip() removefile(tmpfile) expected_words = 'this is a bunch of words words'.split() with open(tmpfile, 'w') as fd: for word in expected_words*2: fd.write(word + '\n') _, tmpout = mkstemp() sendline('sort < {0} | uniq | rev > {1}'.format(tmpfile, tmpout)) expect_prompt() with open(tmpout) as fd: lines = map(lambda x: x.strip(), fd.readlines()) assert lines == 'a hcnub si fo siht sdrow'.split() removefile(tmpfile) removefile(tmpout) _, tmpfile = mkstemp() sendline('{1} test | head -n 100000 > {0}'.format(tmpfile, yes)) expect_prompt() _, tmpout = mkstemp() sendline('wc -l < {0} | rev > {1}'.format(tmpfile, tmpout)) expect_prompt() with open(tmpout) as fd: assert fd.read().strip() == '000001' finally: for f in [yes, tmpfile, tmpout]: if f != None: removefile(f) test_success()