from __future__ import absolute_import, division import contextlib import itertools import logging import sys import time from pip._vendor.progress import HIDE_CURSOR, SHOW_CURSOR from pip._internal.utils.compat import WINDOWS from pip._internal.utils.logging import get_indentation from pip._internal.utils.typing import MYPY_CHECK_RUNNING if MYPY_CHECK_RUNNING: from typing import IO, Iterator logger = logging.getLogger(__name__) class SpinnerInterface(object): def spin(self): # type: () -> None raise NotImplementedError() def finish(self, final_status): # type: (str) -> None raise NotImplementedError() class InteractiveSpinner(SpinnerInterface): def __init__(self, message, file=None, spin_chars="-\\|/", # Empirically, 8 updates/second looks nice min_update_interval_seconds=0.125): # type: (str, IO[str], str, float) -> None self._message = message if file is None: file = sys.stdout self._file = file self._rate_limiter = RateLimiter(min_update_interval_seconds) self._finished = False self._spin_cycle = itertools.cycle(spin_chars) self._file.write(" " * get_indentation() + self._message + " ... ") self._width = 0 def _write(self, status): # type: (str) -> None assert not self._finished # Erase what we wrote before by backspacing to the beginning, writing # spaces to overwrite the old text, and then backspacing again backup = "\b" * self._width self._file.write(backup + " " * self._width + backup) # Now we have a blank slate to add our status self._file.write(status) self._width = len(status) self._file.flush() self._rate_limiter.reset() def spin(self): # type: () -> None if self._finished: return if not self._rate_limiter.ready(): return self._write(next(self._spin_cycle)) def finish(self, final_status): # type: (str) -> None if self._finished: return self._write(final_status) self._file.write("\n") self._file.flush() self._finished = True # Used for dumb terminals, non-interactive installs (no tty), etc. # We still print updates occasionally (once every 60 seconds by default) to # act as a keep-alive for systems like Travis-CI that take lack-of-output as # an indication that a task has frozen. class NonInteractiveSpinner(SpinnerInterface): def __init__(self, message, min_update_interval_seconds=60): # type: (str, float) -> None self._message = message self._finished = False self._rate_limiter = RateLimiter(min_update_interval_seconds) self._update("started") def _update(self, status): # type: (str) -> None assert not self._finished self._rate_limiter.reset() logger.info("%s: %s", self._message, status) def spin(self): # type: () -> None if self._finished: return if not self._rate_limiter.ready(): return self._update("still running...") def finish(self, final_status): # type: (str) -> None if self._finished: return self._update( "finished with status '{final_status}'".format(**locals())) self._finished = True class RateLimiter(object): def __init__(self, min_update_interval_seconds): # type: (float) -> None self._min_update_interval_seconds = min_update_interval_seconds self._last_update = 0 # type: float def ready(self): # type: () -> bool now = time.time() delta = now - self._last_update return delta >= self._min_update_interval_seconds def reset(self): # type: () -> None self._last_update = time.time() @contextlib.contextmanager def open_spinner(message): # type: (str) -> Iterator[SpinnerInterface] # Interactive spinner goes directly to sys.stdout rather than being routed # through the logging system, but it acts like it has level INFO, # i.e. it's only displayed if we're at level INFO or better. # Non-interactive spinner goes through the logging system, so it is always # in sync with logging configuration. if sys.stdout.isatty() and logger.getEffectiveLevel() <= logging.INFO: spinner = InteractiveSpinner(message) # type: SpinnerInterface else: spinner = NonInteractiveSpinner(message) try: with hidden_cursor(sys.stdout): yield spinner except KeyboardInterrupt: spinner.finish("canceled") raise except Exception: spinner.finish("error") raise else: spinner.finish("done") @contextlib.contextmanager def hidden_cursor(file): # type: (IO[str]) -> Iterator[None] # The Windows terminal does not support the hide/show cursor ANSI codes, # even via colorama. So don't even try. if WINDOWS: yield # We don't want to clutter the output with control characters if we're # writing to a file, or if the user is running with --quiet. # See https://github.com/pypa/pip/issues/3418 elif not file.isatty() or logger.getEffectiveLevel() > logging.INFO: yield else: file.write(HIDE_CURSOR) try: yield finally: file.write(SHOW_CURSOR)