# Design (and some implementation) of this owes heavily to Click's
# CliRunner (TODO: bring in license?)
"""Porting notes:
* EchoingStdin.read1() needed to exist for py3 and raw_input
* Not sure why the isolate context manager deals in byte streams and
then relegates to the Result to do late encoding (in properties no
less). This is especially troublesome because sys.stdout/stderr
isn't the same stream as stdout/stderr as returned by the context
manager. (see the extra flush calls in run's finally block.) Is
it just for parity with py2? There was a related bug, sys.stdout was
flushed, but not sys.stderr, which caused py3's error_bytes to come
through as blank.
* sys.stderr had to be flushed, too, on py3 (in invoke's finally)
* Result.exception was redundant with exc_info
* Result.stderr raised a ValueError when stderr was empty, not just
when it wasn't captured.
* Instead of isolated_filesystem, I just added chdir to run,
because pytest already does temporary directories.
* Removed echo_stdin (stdin never echos, as it wouldn't with subprocess)
"""
import os
import sys
import shlex
import getpass
import contextlib
from subprocess import list2cmdline
from functools import partial
try:
from collections.abc import Container
except ImportError:
from collections import Container
PY2 = sys.version_info[0] == 2
if PY2:
from cStringIO import StringIO
else:
import io
unicode = str
from boltons.setutils import complement
def _make_input_stream(input, encoding):
if input is None:
input = b''
elif isinstance(input, unicode):
input = input.encode(encoding)
elif not isinstance(input, bytes):
raise TypeError('expected bytes, text, or None, not: %r' % input)
if PY2:
return StringIO(input)
return io.BytesIO(input)
def _fake_getpass(prompt='Password: ', stream=None):
if not stream:
stream = sys.stderr
input = sys.stdin
prompt = str(prompt)
if prompt:
stream.write(prompt)
stream.flush()
line = input.readline()
if not line:
raise EOFError
if line[-1] == '\n':
line = line[:-1]
return line
[docs]class RunResult(object):
"""Returned from :meth:`CommandChecker.run()`, complete with the
relevant inputs and outputs of the run.
Instances of this object are especially valuable for verifying
expected output via the :attr:`~RunResult.stdout` and
:attr:`~RunResult.stderr` attributes.
API modeled after :class:`subprocess.CompletedProcess` for
familiarity and porting of tests.
"""
def __init__(self, args, input, exit_code, stdout_bytes, stderr_bytes, exc_info=None, checker=None):
self.args = args
self.input = input
self.checker = checker
self.stdout_bytes = stdout_bytes
self.stderr_bytes = stderr_bytes
self.exit_code = exit_code # integer
# if an exception occurred:
self.exc_info = exc_info
# TODO: exc_info won't be available in subprocess... maybe the
# text of a parsed-out traceback? But in general, tracebacks
# aren't usually part of CLI UX...
@property
def exception(self):
"""Exception instance, if an uncaught error was raised.
Equivalent to ``run_res.exc_info[1]``, but more readable."""
return self.exc_info[1] if self.exc_info else None
@property
def returncode(self): # for parity with subprocess.CompletedProcess
"Alias of :attr:`exit_code`, for parity with :class:`subprocess.CompletedProcess`"
return self.exit_code
@property
def stdout(self):
"""The text output ("stdout") of the command, as a decoded
string. See :attr:`stdout_bytes` for the bytestring.
"""
return (self.stdout_bytes
.decode(self.checker.encoding, 'replace')
.replace('\r\n', '\n'))
@property
def stderr(self):
"""The error output ("stderr") of the command, as a decoded
string. See :attr:`stderr_bytes` for the bytestring. May be
``None`` if *mix_stderr* was set to ``True`` in the
:class:`~face.CommandChecker`.
"""
if self.stderr_bytes is None:
raise ValueError("stderr not separately captured")
return (self.stderr_bytes
.decode(self.checker.encoding, 'replace')
.replace('\r\n', '\n'))
def __repr__(self):
# very similar to subprocess.CompleteProcess repr
args = ['args={!r}'.format(self.args),
'returncode={!r}'.format(self.returncode)]
if self.stdout_bytes:
args.append('stdout=%r' % (self.stdout,))
if self.stderr_bytes is not None:
args.append('stderr=%r' % (self.stderr,))
if self.exception:
args.append('exception=%r' % (self.exception,))
return "%s(%s)" % (self.__class__.__name__, ', '.join(args))
def _get_exp_code_text(exp_codes):
try:
codes_len = len(exp_codes)
except Exception:
comp_codes = complement(exp_codes)
try:
comp_codes = tuple(comp_codes)
return 'any code but %r' % (comp_codes[0] if len(comp_codes) == 1 else comp_codes)
except Exception:
return repr(exp_codes)
if codes_len == 1:
return repr(exp_codes[0])
return 'one of %r' % (tuple(exp_codes),)
[docs]class CheckError(AssertionError):
"""Rarely raised directly, :exc:`CheckError` is automatically
raised when a :meth:`CommandChecker.run()` call does not terminate
with an expected error code.
This error attempts to format the stdout, stderr, and stdin of the
run for easier debugging.
"""
def __init__(self, result, exit_codes):
self.result = result
exp_code = _get_exp_code_text(exit_codes)
msg = ('Got exit code %r (expected %s) when running command: %s'
% (result.exit_code, exp_code, list2cmdline(result.args)))
if result.stdout:
msg += '\nstdout = """\n'
msg += result.stdout
msg += '"""\n'
if result.stderr_bytes:
msg += '\nstderr = """\n'
msg += result.stderr
msg += '"""\n'
if result.input:
msg += '\nstdin = """\n'
msg += result.input
msg += '"""\n'
AssertionError.__init__(self, msg)
[docs]class CommandChecker(object):
"""Face's main testing interface.
Wrap your :class:`Command` instance in a :class:`CommandChecker`,
:meth:`~CommandChecker.run()` commands with arguments, and get
:class:`RunResult` objects to validate your Command's behavior.
Args:
cmd: The :class:`Command` instance to test.
env (dict): An optional base environment to use for subsequent
calls issued through this checker. Defaults to ``{}``.
chdir (str): A default path to execute this checker's commands
in. Great for temporary directories to ensure test isolation.
mix_stderr (bool): Set to ``True`` to capture stderr into
stdout. This makes it easier to verify order of standard
output and errors. If ``True``, this checker's results'
error_bytes will be set to ``None``. Defaults to ``False``.
reraise (bool): Reraise uncaught exceptions from within *cmd*'s
endpoint functions, instead of returning a :class:`RunResult`
instance. Defaults to ``False``.
"""
def __init__(self, cmd, env=None, chdir=None, mix_stderr=False, reraise=False):
self.cmd = cmd
self.base_env = env or {}
self.reraise = reraise
self.mix_stderr = mix_stderr
self.encoding = 'utf8' # not clear if this should be an arg yet
self.chdir = chdir
@contextlib.contextmanager
def _isolate(self, input=None, env=None, chdir=None):
old_cwd = os.getcwd()
old_stdin, old_stdout, old_stderr = sys.stdin, sys.stdout, sys.stderr
old_getpass = getpass.getpass
tmp_stdin = _make_input_stream(input, self.encoding)
full_env = dict(self.base_env)
chdir = chdir or self.chdir
if env:
full_env.update(env)
if PY2:
tmp_stdout = bytes_output = StringIO()
if self.mix_stderr:
tmp_stderr = tmp_stdout
else:
bytes_error = tmp_stderr = StringIO()
else:
bytes_output = io.BytesIO()
tmp_stdin = io.TextIOWrapper(tmp_stdin, encoding=self.encoding)
tmp_stdout = io.TextIOWrapper(
bytes_output, encoding=self.encoding)
if self.mix_stderr:
tmp_stderr = tmp_stdout
else:
bytes_error = io.BytesIO()
tmp_stderr = io.TextIOWrapper(
bytes_error, encoding=self.encoding)
old_env = {}
try:
_sync_env(os.environ, full_env, old_env)
if chdir:
os.chdir(str(chdir))
sys.stdin, sys.stdout, sys.stderr = tmp_stdin, tmp_stdout, tmp_stderr
getpass.getpass = _fake_getpass
yield (bytes_output, bytes_error if not self.mix_stderr else None)
finally:
if chdir:
os.chdir(old_cwd)
_sync_env(os.environ, old_env)
# see note above
tmp_stdout.flush()
tmp_stderr.flush()
sys.stdin, sys.stdout, sys.stderr = old_stdin, old_stdout, old_stderr
getpass.getpass = old_getpass
return
[docs] def fail(self, *a, **kw):
"""Convenience method around :meth:`~CommandChecker.run()`, with the
same signature, except that this will raise a
:exc:`CheckError` if the command completes with exit code
``0``.
"""
kw.setdefault('exit_code', complement(set([0])))
return self.run(*a, **kw)
def __getattr__(self, name):
if not name.startswith('fail_'):
return super(CommandChecker, self).__getattr__(name)
_, _, code_str = name.partition('fail_')
try:
code = [int(cs) for cs in code_str.split('_')]
except Exception:
raise AttributeError('fail_* shortcuts must end in integers, not %r'
% code_str)
return partial(self.fail, exit_code=code)
[docs] def run(self, args, input=None, env=None, chdir=None, exit_code=0):
"""The :meth:`run` method acts as the primary entrypoint to the
:class:`CommandChecker` instance. Pass arguments as a list or
string, and receive a :class:`RunResult` with which to verify
your command's output.
If the arguments do not result in an expected *exit_code*, a
:exc:`CheckError` will be raised.
Args:
args: A list or string representing arguments, as one might
find in :attr:`sys.argv` or at the command line.
input (str): A string (or list of lines) to be passed to
the command's stdin. Used for testing
:func:`~face.prompt` interactions, among others.
env (dict): A mapping of environment variables to apply on
top of the :class:`CommandChecker`'s base env vars.
chdir (str): A string (or stringifiable path) path to
switch to before running the command. Defaults to
``None`` (runs in current directory).
exit_code (int): An integer or list of integer exit codes
expected from running the command with *args*. If the
actual exit code does not match *exit_code*,
:exc:`CheckError` is raised. Set to ``None`` to disable
this behavior and always return
:class:`RunResult`. Defaults to ``0``.
.. note::
At this time, :meth:`run` interacts with global process
state, and is not designed for parallel usage.
"""
if isinstance(input, (list, tuple)):
input = '\n'.join(input)
if exit_code is None:
exit_codes = ()
elif isinstance(exit_code, int):
exit_codes = (exit_code,)
elif not isinstance(exit_code, Container):
raise TypeError('expected exit_code to be None, int, or'
' Container of ints, representing expected'
' exit_codes, not: %r' % (exit_code,))
else:
exit_codes = exit_code
with self._isolate(input=input, env=env, chdir=chdir) as (stdout, stderr):
exc_info = None
exit_code = 0
if isinstance(args, (str, unicode)):
args = shlex.split(args)
try:
res = self.cmd.run(args or ())
except SystemExit as se:
exc_info = sys.exc_info()
exit_code = se.code if se.code is not None else 0
except Exception:
if self.reraise:
raise
exit_code = -1 # TODO: something better?
exc_info = sys.exc_info()
finally:
sys.stdout.flush()
sys.stderr.flush()
stdout_bytes = stdout.getvalue()
stderr_bytes = stderr.getvalue() if not self.mix_stderr else None
run_res = RunResult(checker=self,
args=args,
input=input,
stdout_bytes=stdout_bytes,
stderr_bytes=stderr_bytes,
exit_code=exit_code,
exc_info=exc_info)
if exit_codes and exit_code not in exit_codes:
exc = CheckError(run_res, exit_codes)
raise exc
return run_res
# syncing os.environ (as opposed to modifying a copy and setting it
# back) takes care of cases when someone has a reference to environ
def _sync_env(env, new, backup=None):
if PY2:
# py2 expects bytes in os.environ
encode = lambda x: x.encode('utf8') if isinstance(x, unicode) else x
new = {encode(k): encode(v) for k, v in new.items()}
for key, value in new.items():
if backup is not None:
backup[key] = env.get(key)
if value is not None:
env[key] = value
continue
try:
del env[key]
except Exception:
pass
return backup