"""Build Environment used for isolation during sdist building
import logging
import os
import sys
from distutils.sysconfig import get_python_lib
from sysconfig import get_paths
from pip._vendor.pkg_resources import Requirement, VersionConflict, WorkingSet
from pip._internal.utils.misc import call_subprocess
from pip._internal.utils.temp_dir import TempDirectory
from pip._internal.utils.ui import open_spinner
logger = logging.getLogger(__name__)
class BuildEnvironment(object):
"""Creates and manages an isolated environment to install build deps
def __init__(self):
self._temp_dir = TempDirectory(kind="build-env")
def path(self):
return self._temp_dir.path
def __enter__(self):
self.save_path = os.environ.get('PATH', None)
self.save_pythonpath = os.environ.get('PYTHONPATH', None)
self.save_nousersite = os.environ.get('PYTHONNOUSERSITE', None)
install_scheme = 'nt' if ( == 'nt') else 'posix_prefix'
install_dirs = get_paths(install_scheme, vars={
'base': self.path,
'platbase': self.path,
scripts = install_dirs['scripts']
if self.save_path:
os.environ['PATH'] = scripts + os.pathsep + self.save_path
os.environ['PATH'] = scripts + os.pathsep + os.defpath
# Note: prefer distutils' sysconfig to get the
# library paths so PyPy is correctly supported.
purelib = get_python_lib(plat_specific=0, prefix=self.path)
platlib = get_python_lib(plat_specific=1, prefix=self.path)
if purelib == platlib:
lib_dirs = purelib
lib_dirs = purelib + os.pathsep + platlib
if self.save_pythonpath:
os.environ['PYTHONPATH'] = lib_dirs + os.pathsep + \
os.environ['PYTHONPATH'] = lib_dirs
os.environ['PYTHONNOUSERSITE'] = '1'
return self.path
def __exit__(self, exc_type, exc_val, exc_tb):
def restore_var(varname, old_value):
if old_value is None:
os.environ.pop(varname, None)
os.environ[varname] = old_value
restore_var('PATH', self.save_path)
restore_var('PYTHONPATH', self.save_pythonpath)
restore_var('PYTHONNOUSERSITE', self.save_nousersite)
def cleanup(self):
def missing_requirements(self, reqs):
"""Return a list of the requirements from reqs that are not present
missing = []
with self:
ws = WorkingSet(os.environ["PYTHONPATH"].split(os.pathsep))
for req in reqs:
if ws.find(Requirement.parse(req)) is None:
except VersionConflict:
return missing
def install_requirements(self, finder, requirements, message):
args = [
sys.executable, '-m', 'pip', 'install', '--ignore-installed',
'--no-user', '--prefix', self.path, '--no-warn-script-location',
if logger.getEffectiveLevel() <= logging.DEBUG:
for format_control in ('no_binary', 'only_binary'):
formats = getattr(finder.format_control, format_control)
args.extend(('--' + format_control.replace('_', '-'),
','.join(sorted(formats or {':none:'}))))
if finder.index_urls:
args.extend(['-i', finder.index_urls[0]])
for extra_index in finder.index_urls[1:]:
args.extend(['--extra-index-url', extra_index])
for link in finder.find_links:
args.extend(['--find-links', link])
for _, host, _ in finder.secure_origins:
args.extend(['--trusted-host', host])
if finder.allow_all_prereleases:
if finder.process_dependency_links:
with open_spinner(message) as spinner:
call_subprocess(args, show_stdout=False, spinner=spinner)
class NoOpBuildEnvironment(BuildEnvironment):
"""A no-op drop-in replacement for BuildEnvironment
def __init__(self):
def __enter__(self):
def __exit__(self, exc_type, exc_val, exc_tb):
def cleanup(self):
def install_requirements(self, finder, requirements, message):
raise NotImplementedError()
"""Cache Management
import errno
import hashlib
import logging
import os
from pip._vendor.packaging.utils import canonicalize_name
from import path_to_url
from import Link
from pip._internal.utils.compat import expanduser
from pip._internal.utils.temp_dir import TempDirectory
from pip._internal.wheel import InvalidWheelFilename, Wheel
logger = logging.getLogger(__name__)
class Cache(object):
"""An abstract class - provides cache directories for data from links
:param cache_dir: The root of the cache.
:param format_control: An object of FormatControl class to limit
binaries being read from the cache.
:param allowed_formats: which formats of files the cache should store.
('binary' and 'source' are the only allowed values)
def __init__(self, cache_dir, format_control, allowed_formats):
super(Cache, self).__init__()
self.cache_dir = expanduser(cache_dir) if cache_dir else None
self.format_control = format_control
self.allowed_formats = allowed_formats
_valid_formats = {"source", "binary"}
assert self.allowed_formats.union(_valid_formats) == _valid_formats
def _get_cache_path_parts(self, link):
"""Get parts of part that must be os.path.joined with cache_dir
# We want to generate an url to use as our cache key, we don't want to
# just re-use the URL because it might have other items in the fragment
# and we don't care about those.
key_parts = [link.url_without_fragment]
if link.hash_name is not None and link.hash is not None:
key_parts.append("=".join([link.hash_name, link.hash]))
key_url = "#".join(key_parts)
# Encode our key url with sha224, we'll use this because it has similar
# security properties to sha256, but with a shorter total output (and
# thus less secure). However the differences don't make a lot of
# difference for our use case here.
hashed = hashlib.sha224(key_url.encode()).hexdigest()
# We want to nest the directories some to prevent having a ton of top
# level directories where we might run out of sub directories on some
# FS.
parts = [hashed[:2], hashed[2:4], hashed[4:6], hashed[6:]]
return parts
def _get_candidates(self, link, package_name):
can_not_cache = (
not self.cache_dir or
not package_name or
not link
if can_not_cache:
return []
canonical_name = canonicalize_name(package_name)
formats = self.format_control.get_allowed_formats(
if not self.allowed_formats.intersection(formats):
return []
root = self.get_path_for_link(link)
return os.listdir(root)
except OSError as err:
if err.errno in {errno.ENOENT, errno.ENOTDIR}:
return []
def get_path_for_link(self, link):
"""Return a directory to store cached items in for link.
raise NotImplementedError()
def get(self, link, package_name):
"""Returns a link to a cached item if it exists, otherwise returns the
passed link.
raise NotImplementedError()
def _link_for_candidate(self, link, candidate):
root = self.get_path_for_link(link)
path = os.path.join(root, candidate)
return Link(path_to_url(path))
def cleanup(self):
class SimpleWheelCache(Cache):
"""A cache of wheels for future installs.
def __init__(self, cache_dir, format_control):
super(SimpleWheelCache, self).__init__(
cache_dir, format_control, {"binary"}
def get_path_for_link(self, link):
"""Return a directory to store cached wheels for link
Because there are M wheels for any one sdist, we provide a directory
to cache them in, and then consult that directory when looking up
cache hits.
We only insert things into the cache if they have plausible version
numbers, so that we don't contaminate the cache with things that were
not unique. E.g. ./package might have dozens of installs done for it
and build a version of 0.0...and if we built and cached a wheel, we'd
end up using the same wheel even if the source has been edited.
:param link: The link of the sdist for which this will cache wheels.
parts = self._get_cache_path_parts(link)
# Store wheels within the root cache_dir
return os.path.join(self.cache_dir, "wheels", *parts)
def get(self, link, package_name):
candidates = []
for wheel_name in self._get_candidates(link, package_name):
wheel = Wheel(wheel_name)
except InvalidWheelFilename:
if not wheel.supported():
# Built for a different python/arch/etc
candidates.append((wheel.support_index_min(), wheel_name))
if not candidates:
return link
return self._link_for_candidate(link, min(candidates)[1])
class EphemWheelCache(SimpleWheelCache):
"""A SimpleWheelCache that creates it's own temporary cache directory
def __init__(self, format_control):
self._temp_dir = TempDirectory(kind="ephem-wheel-cache")
super(EphemWheelCache, self).__init__(
self._temp_dir.path, format_control
def cleanup(self):
class WheelCache(Cache):
"""Wraps EphemWheelCache and SimpleWheelCache into a single Cache
This Cache allows for gracefully degradation, using the ephem wheel cache
when a certain link is not found in the simple wheel cache first.
def __init__(self, cache_dir, format_control):
super(WheelCache, self).__init__(
cache_dir, format_control, {'binary'}
self._wheel_cache = SimpleWheelCache(cache_dir, format_control)
self._ephem_cache = EphemWheelCache(format_control)
def get_path_for_link(self, link):
return self._wheel_cache.get_path_for_link(link)
def get_ephem_path_for_link(self, link):
return self._ephem_cache.get_path_for_link(link)
def get(self, link, package_name):
retval = self._wheel_cache.get(link, package_name)
if retval is link:
retval = self._ephem_cache.get(link, package_name)
return retval
def cleanup(self):
"""Subpackage containing all of pip's command line interface related code
# This file intentionally does not import submodules
"""Logic that powers autocompletion installed by ``pip completion``.
import optparse
import os
import sys
from pip._internal.cli.main_parser import create_main_parser
from pip._internal.commands import commands_dict, get_summaries
from pip._internal.utils.misc import get_installed_distributions
def autocomplete():
"""Entry Point for completion of main and subcommand options.
# Don't complete if user hasn't sourced bash_completion file.
if 'PIP_AUTO_COMPLETE' not in os.environ:
cwords = os.environ['COMP_WORDS'].split()[1:]
cword = int(os.environ['COMP_CWORD'])
current = cwords[cword - 1]
except IndexError:
current = ''
subcommands = [cmd for cmd, summary in get_summaries()]
options = []
# subcommand
subcommand_name = [w for w in cwords if w in subcommands][0]
except IndexError:
subcommand_name = None
parser = create_main_parser()
# subcommand options
if subcommand_name:
# special case: 'help' subcommand has no options
if subcommand_name == 'help':
# special case: list locally installed dists for show and uninstall
should_list_installed = (
subcommand_name in ['show', 'uninstall'] and
not current.startswith('-')
if should_list_installed:
installed = []
lc = current.lower()
for dist in get_installed_distributions(local_only=True):
if dist.key.startswith(lc) and dist.key not in cwords[1:]:
# if there are no dists installed, fall back to option completion
if installed:
for dist in installed:
subcommand = commands_dict[subcommand_name]()
for opt in subcommand.parser.option_list_all:
if != optparse.SUPPRESS_HELP:
for opt_str in opt._long_opts + opt._short_opts:
options.append((opt_str, opt.nargs))
# filter out previously specified options from available options
prev_opts = [x.split('=')[0] for x in cwords[1:cword - 1]]
options = [(x, v) for (x, v) in options if x not in prev_opts]
# filter options by current input
options = [(k, v) for k, v in options if k.startswith(current)]
# get completion type given cwords and available subcommand options
completion_type = get_path_completion_type(
cwords, cword, subcommand.parser.option_list_all,
# get completion files and directories if ``completion_type`` is
# ``<file>``, ``<dir>`` or ``<path>``
if completion_type:
options = auto_complete_paths(current, completion_type)
options = ((opt, 0) for opt in options)
for option in options:
opt_label = option[0]
# append '=' to options which require args
if option[1] and option[0][:2] == "--":
opt_label += '='
# show main parser options only when necessary
opts = [i.option_list for i in parser.option_groups]
opts = (o for it in opts for o in it)
if current.startswith('-'):
for opt in opts:
if != optparse.SUPPRESS_HELP:
subcommands += opt._long_opts + opt._short_opts
# get completion type given cwords and all available options
completion_type = get_path_completion_type(cwords, cword, opts)
if completion_type:
subcommands = auto_complete_paths(current, completion_type)
print(' '.join([x for x in subcommands if x.startswith(current)]))
def get_path_completion_type(cwords, cword, opts):
"""Get the type of path completion (``file``, ``dir``, ``path`` or None)
:param cwords: same as the environmental variable ``COMP_WORDS``
:param cword: same as the environmental variable ``COMP_CWORD``
:param opts: The available options to check
:return: path completion type (``file``, ``dir``, ``path`` or None)
if cword < 2 or not cwords[cword - 2].startswith('-'):
for opt in opts:
if == optparse.SUPPRESS_HELP:
for o in str(opt).split('/'):
if cwords[cword - 2].split('=')[0] == o:
if not opt.metavar or any(
x in ('path', 'file', 'dir')
for x in opt.metavar.split('/')):
return opt.metavar
def auto_complete_paths(current, completion_type):
"""If ``completion_type`` is ``file`` or ``path``, list all regular files
and directories starting with ``current``; otherwise only list directories
starting with ``current``.
:param current: The word to be completed
:param completion_type: path completion type(`file`, `path` or `dir`)i
:return: A generator of regular files and/or directories
directory, filename = os.path.split(current)
current_path = os.path.abspath(directory)
# Don't complete paths if they can't be accessed
if not os.access(current_path, os.R_OK):
filename = os.path.normcase(filename)
# list all files that start with ``filename``
file_list = (x for x in os.listdir(current_path)
if os.path.normcase(x).startswith(filename))
for f in file_list:
opt = os.path.join(current_path, f)
comp_file = os.path.normcase(os.path.join(directory, f))
# complete regular files when there is not ``<dir>`` after option
# complete directories when there is ``<file>``, ``<path>`` or
# ``<dir>``after option
if completion_type != 'dir' and os.path.isfile(opt):
yield comp_file
elif os.path.isdir(opt):
yield os.path.join(comp_file, '')
"""Base Command class, and related routines"""
from __future__ import absolute_import
import logging
import logging.config
import optparse
import os
import sys
from pip._internal.cli import cmdoptions
from pip._internal.cli.parser import (
ConfigOptionParser, UpdatingDefaultsHelpFormatter,
from pip._internal.cli.status_codes import (
from import PipSession
from pip._internal.exceptions import (
BadCommand, CommandError, InstallationError, PreviousBuildDirError,
from pip._internal.index import PackageFinder
from pip._internal.locations import running_under_virtualenv
from pip._internal.req.constructors import (
install_req_from_editable, install_req_from_line,
from pip._internal.req.req_file import parse_requirements
from pip._internal.utils.logging import setup_logging
from pip._internal.utils.misc import get_prog, normalize_path
from pip._internal.utils.outdated import pip_version_check
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
from typing import Optional # noqa: F401
__all__ = ['Command']
logger = logging.getLogger(__name__)
class Command(object):
name = None # type: Optional[str]
usage = None # type: Optional[str]
hidden = False # type: bool
ignore_require_venv = False # type: bool
def __init__(self, isolated=False):
parser_kw = {
'usage': self.usage,
'prog': '%s %s' % (get_prog(),,
'formatter': UpdatingDefaultsHelpFormatter(),