#
# test_spawn.py
#
# test script for distutils.spawn module
#
# GPW 1999/07/20
#
# $Id$
#

import sys, os, string
from tempfile import mktemp
from distutils.spawn import spawn
from distutils.util import abspath
from distutils.errors import *
from unittest import TestScenario, parse_args, run_scenarios
from util import try_it

class SpawnTest (TestScenario):

    def setup (self):
        self.python = os.path.normpath (abspath (sys.executable))

        self.cmd_trivial = self.gen_cmd ('1')
        self.cmd_trivial_s = string.join (self.cmd_trivial)
        self.cmd_print = self.gen_cmd ('print "hello"')
        self.cmd_print_s = string.join (self.cmd_print)
        self.cmd_err = self.gen_cmd ('import sys; sys.stderr.write("foo\\n")')
        self.cmd_err_s = string.join (self.cmd_err)
        self.cmd_mixed = self.gen_cmd ("import sys; "
                                       "sys.stdout.write('out'); "
                                       "sys.stderr.write('err'); "
                                       "sys.stdout.write('put'); "
                                       "sys.stderr.write('or')")


    def shutdown (self):
        pass


    def capture_output (self):
        """Temporarily redirect stdout and stderr to files, the
        contents of which will be returned by 'stop_capture()'."""
        self.out_filename = mktemp()
        self.err_filename = mktemp()
        self.out_file = open(self.out_filename, 'w')
        self.err_file = open(self.err_filename, 'w')
        self.save_stdout = os.dup(1)
        self.save_stderr = os.dup(2)
        os.close(1)
        if os.dup(self.out_file.fileno()) != 1:
            raise RuntimeError, "couldn't redirect stdout - dup() error"
        os.close(2)
        if os.dup(self.err_file.fileno()) != 2:
            raise RuntimeError, "couldn't redirect stderr - dup() error"

    def stop_capture (self):
        os.close(1)
        os.dup(self.save_stdout)
        os.close(2)
        os.dup(self.save_stderr)

        self.out_file.close()
        self.err_file.close()

        out_file = open(self.out_filename)
        output = out_file.read()
        out_file.close()
        os.unlink (self.out_filename)

        err_file = open(self.err_filename)
        error = err_file.read()
        err_file.close()
        os.unlink (self.err_filename)

        return (output, error)


    #def capture_test_1 (self, test_method, args):
    #    """Run a 


    def test_out (self, code, output, error):
        # XXX I have no idea how to do this in a portable way!

        from unittest import get_caller_env
        (globals, locals) = get_caller_env ()

        if os.name == 'posix':
            p2cread, p2cwrite = os.pipe()
            c2pread, c2pwrite = os.pipe()
            err_read, err_write = os.pipe()

            pid = os.fork()
            if pid == 0:                # in the child
                os.close(0)             # close stdin
                os.close(1)             # and stdout
                os.close(2)             # and stderr

                if os.dup(p2cread) != 0:
                    raise os.error, "dup stdin (read) != 0"
                if os.dup(c2pwrite) != 1:
                    raise os.error, "dup stdout (write) != 1"
                if os.dup(err_write) != 2:
                    raise os.error, "dup stderr (write) != 2"

                eval (code, globals, locals)
                os._exit (0)

            # in the parent
            os.close(p2cread)
            child_stdin = os.fdopen (p2cwrite, 'w')
            os.close(c2pwrite)
            child_stdout = os.fdopen (c2pread, 'r')
            os.close(err_write)
            child_stderr = os.fdopen (err_read, 'r')

            child_out = child_stdout.read()
            child_err = child_stderr.read()

            child_stdin.close()
            child_stdout.close()
            child_stderr.close()
            os.waitpid (pid, 0)

            self.tests_run = self.tests_run + 1

            if output == child_out and error == child_err:
                self.report_pass (
                    code + "\n" +
                    "    wrote %s to stdout\n" % `child_out` +
                    "    and %s to stderr" % `child_err`)
            elif output != child_out and error == child_err:
                self.report_fail (
                    code + "\n" +
                    "        did not write %s to stdout (wrote %s)\n" %
                      (`output`, `child_out`) +
                    "        but wrote %s to stderr" % child_err)
            elif output == child_out and error != child_err:
                self.report_fail (
                    code + "\n" +
                    "        wrote %s to stdout" % `child_out` +
                    "        but did not write %s to stderr (wrote %s)" %
                      (`error`, `child_err`))
            else:
                self.report_fail (
                    code + "\n" +
                    "        did not write %s to stdout (wrote %s)\n" %
                      (`output`, `child_out`) +
                    "        and did not write %s to stderr (wrote %s)" %
                      (`error`, `child_err`))

        else:
            raise RuntimeError, \
                  "don't know how to capture stdout/stderr on platform '%s'"%\
                  os.name

    # test_out ()
        


    def gen_cmd (self, script):
        """Generate a Python command line to run 'script' as a list
        for use by 'spawn()'.  Eg. if 'script' is "print 'hello'",
        returns the list ["python", "-c", "print 'hello'"] -- except
        the full path to the executable is used."""
        return [self.python, "-c", str(script)]


    test_cases = ['simple', 'output', 'verbose', 'dryrun']


    def check_simple (self):
        "Simple spawns (success and failure): 6"

        self.test_stmt ("spawn (%s)" % self.cmd_mixed)
        self.test_stmt ("spawn (%s)" % self.gen_cmd("import sys;sys.exit(0)"))
        self.test_exc ("spawn (%s)" % self.gen_cmd("import sys;sys.exit(42)"),
                       DistutilsExecError)

        self.capture_output()
        self.test_exc ("spawn (['vjwe9ghwe09fnkwef098vdsjn3209'])",
                       DistutilsExecError)
        (output, error) = self.stop_capture()
        exp_error = "unable to execute vjwe9ghwe09fnkwef098vdsjn3209:"
        self.test_val ("output", "")
        self.test_val ("error[0:%d]" % len(exp_error), exp_error)

    def check_output (self):
        "Spawn commands with output and error output: 3"

        self.test_out ("spawn (%s)" % self.cmd_print,
                       "hello\n", "")
        self.test_out ("spawn (%s)" % self.cmd_err,
                       "", "foo\n")
        self.test_out ("spawn (%s)" % self.cmd_mixed,
                       "output", "error")

    def check_verbose (self):
        "Verbose reporting of command executed: 2"

        self.test_out ("spawn (%s, verbose=1)" % self.cmd_trivial,
                       self.cmd_trivial_s + "\n", "")
        self.test_out ("spawn (%s, verbose=1)" % self.cmd_print,
                       self.cmd_print_s + "\n" + "hello\n",
                       "")

    def check_dryrun (self):
        "Spawn commands in dry-run mode (ie. don't actually do anything): "
        self.test_out ("spawn (%s, dry_run=1)" % self.cmd_print,
                       "", "")
        self.test_out ("spawn (%s, dry_run=1, verbose=1)" % self.cmd_print,
                       "%s -c %s\n" % (self.python, self.cmd_print_s),
                       "")
        
        
if __name__ == "__main__":
    (scenarios, options) = parse_args()
    run_scenarios (scenarios, options)
    sys.exit()

spawn (["/bin/ls"])
print "ok: basic spawn with full path"

spawn (["ls"])
print "ok: basic spawn without full path"

# print "this will blow up, but I'll catch it:"
# try:
#     spawn (["ls"])
# except DistutilsExecError, msg:
#     print "caught DistutilsExecError: \"%s\"" % msg

# print "same again, but I'll wrap it in an eval for catching:"
# try:
#     eval ('spawn (["ls"])')
# except DistutilsExecError, msg:
#     print "caught DistutilsExecError: \"%s\"" % msg
    

#print "this should blow up:"
#spawn (["ls"])


try_it ('spawn (["ls"], search_path=0)', DistutilsExecError,
        "spawn without full path, not searching path")

spawn (["/bin/ls", "-l", sys.executable])
print "ok: spawn with args"

spawn (["/bin/ls", "-l", sys.executable], verbose=1)
print "ok: verbose spawn"

spawn (["/bin/ls", "-l", sys.executable], verbose=1, dry_run=1)
print "ok: verbose, dry-run spawn"

try_it ('spawn (["/bin/ls", "aosjhfjhdsaf"])',
        DistutilsExecError, "spawn with bogus argument")
