from __future__ import absolute_import import logging import sys import textwrap from collections import OrderedDict from pip._vendor import pkg_resources from pip._vendor.packaging.version import parse as parse_version # NOTE: XMLRPC Client is not annotated in typeshed as on 2017-07-17, which is # why we ignore the type on this import from pip._vendor.six.moves import xmlrpc_client # type: ignore from pip._internal.cli.base_command import Command from pip._internal.cli.req_command import SessionCommandMixin from pip._internal.cli.status_codes import NO_MATCHES_FOUND, SUCCESS from pip._internal.exceptions import CommandError from pip._internal.models.index import PyPI from pip._internal.network.xmlrpc import PipXmlrpcTransport from pip._internal.utils.compat import get_terminal_size from pip._internal.utils.logging import indent_log from pip._internal.utils.misc import get_distribution, write_output from pip._internal.utils.typing import MYPY_CHECK_RUNNING if MYPY_CHECK_RUNNING: from optparse import Values from typing import Dict, List, Optional from typing_extensions import TypedDict TransformedHit = TypedDict( 'TransformedHit', {'name': str, 'summary': str, 'versions': List[str]}, ) logger = logging.getLogger(__name__) class SearchCommand(Command, SessionCommandMixin): """Search for PyPI packages whose name or summary contains .""" usage = """ %prog [options] """ ignore_require_venv = True def add_options(self): # type: () -> None self.cmd_opts.add_option( '-i', '--index', dest='index', metavar='URL', default=PyPI.pypi_url, help='Base URL of Python Package Index (default %default)') self.parser.insert_option_group(0, self.cmd_opts) def run(self, options, args): # type: (Values, List[str]) -> int if not args: raise CommandError('Missing required argument (search query).') query = args pypi_hits = self.search(query, options) hits = transform_hits(pypi_hits) terminal_width = None if sys.stdout.isatty(): terminal_width = get_terminal_size()[0] print_results(hits, terminal_width=terminal_width) if pypi_hits: return SUCCESS return NO_MATCHES_FOUND def search(self, query, options): # type: (List[str], Values) -> List[Dict[str, str]] index_url = options.index session = self.get_default_session(options) transport = PipXmlrpcTransport(index_url, session) pypi = xmlrpc_client.ServerProxy(index_url, transport) try: hits = pypi.search({'name': query, 'summary': query}, 'or') except xmlrpc_client.Fault as fault: message = "XMLRPC request failed [code: {code}]\n{string}".format( code=fault.faultCode, string=fault.faultString, ) raise CommandError(message) return hits def transform_hits(hits): # type: (List[Dict[str, str]]) -> List[TransformedHit] """ The list from pypi is really a list of versions. We want a list of packages with the list of versions stored inline. This converts the list from pypi into one we can use. """ packages = OrderedDict() # type: OrderedDict[str, TransformedHit] for hit in hits: name = hit['name'] summary = hit['summary'] version = hit['version'] if name not in packages.keys(): packages[name] = { 'name': name, 'summary': summary, 'versions': [version], } else: packages[name]['versions'].append(version) # if this is the highest version, replace summary and score if version == highest_version(packages[name]['versions']): packages[name]['summary'] = summary return list(packages.values()) def print_results(hits, name_column_width=None, terminal_width=None): # type: (List[TransformedHit], Optional[int], Optional[int]) -> None if not hits: return if name_column_width is None: name_column_width = max([ len(hit['name']) + len(highest_version(hit.get('versions', ['-']))) for hit in hits ]) + 4 installed_packages = [p.project_name for p in pkg_resources.working_set] for hit in hits: name = hit['name'] summary = hit['summary'] or '' latest = highest_version(hit.get('versions', ['-'])) if terminal_width is not None: target_width = terminal_width - name_column_width - 5 if target_width > 10: # wrap and indent summary to fit terminal summary_lines = textwrap.wrap(summary, target_width) summary = ('\n' + ' ' * (name_column_width + 3)).join( summary_lines) line = '{name_latest:{name_column_width}} - {summary}'.format( name_latest='{name} ({latest})'.format(**locals()), **locals()) try: write_output(line) if name in installed_packages: dist = get_distribution(name) assert dist is not None with indent_log(): if dist.version == latest: write_output('INSTALLED: %s (latest)', dist.version) else: write_output('INSTALLED: %s', dist.version) if parse_version(latest).pre: write_output('LATEST: %s (pre-release; install' ' with "pip install --pre")', latest) else: write_output('LATEST: %s', latest) except UnicodeEncodeError: pass def highest_version(versions): # type: (List[str]) -> str return max(versions, key=parse_version)