from __future__ import print_function, division, absolute_import import logging import sys import hashlib import collections from functools import partial from os.path import abspath, isdir, join import os import re import subprocess import tempfile log = logging.getLogger(__name__) stderrlog = logging.getLogger('stderrlog') class memoized(object): """Decorator. Caches a function's return value each time it is called. If called later with the same arguments, the cached value is returned (not reevaluated). """ def __init__(self, func): self.func = func self.cache = {} def __call__(self, *args, **kw): newargs = [] for arg in args: if isinstance(arg, list): newargs.append(tuple(arg)) elif not isinstance(arg, collections.Hashable): # uncacheable. a list, for instance. # better to not cache than blow up. return self.func(*args, **kw) else: newargs.append(arg) newargs = tuple(newargs) key = (newargs, frozenset(sorted(kw.items()))) if key in self.cache: return self.cache[key] else: value = self.func(*args, **kw) self.cache[key] = value return value # For instance methods only class memoize(object): # 577452 def __init__(self, func): self.func = func def __get__(self, obj, objtype=None): if obj is None: return self.func return partial(self, obj) def __call__(self, *args, **kw): obj = args[0] try: cache = obj.__cache except AttributeError: cache = obj.__cache = {} key = (self.func, args[1:], frozenset(sorted(kw.items()))) try: res = cache[key] except KeyError: res = cache[key] = self.func(*args, **kw) return res @memoized def gnu_get_libc_version(): """ If on linux, get installed version of glibc, otherwise return None """ if not sys.platform.startswith('linux'): return None from ctypes import CDLL, cdll, c_char_p cdll.LoadLibrary('libc.so.6') libc = CDLL('libc.so.6') f = libc.gnu_get_libc_version f.restype = c_char_p return f() def can_open(file): """ Return True if the given ``file`` can be opened for writing """ try: fp = open(file, "ab") fp.close() return True except IOError: stderrlog.info("Unable to open %s\n" % file) return False def can_open_all(files): """ Return True if all of the provided ``files`` can be opened """ for f in files: if not can_open(f): return False return True def can_open_all_files_in_prefix(prefix, files): """ Returns True if all ``files`` at a given ``prefix`` can be opened """ return can_open_all((os.path.join(prefix, f) for f in files)) def try_write(dir_path): if not isdir(dir_path): return False # try to create a file to see if `dir_path` is writable, see #2151 temp_filename = join(dir_path, '.conda-try-write-%d' % os.getpid()) try: with open(temp_filename, mode='wb') as fo: fo.write(b'This is a test file.\n') os.unlink(temp_filename) return True except (IOError, OSError): return False def hashsum_file(path, mode='md5'): h = hashlib.new(mode) with open(path, 'rb') as fi: while True: chunk = fi.read(262144) # process chunks of 256KB if not chunk: break h.update(chunk) return h.hexdigest() def md5_file(path): return hashsum_file(path, 'md5') def url_path(path): path = abspath(path) if sys.platform == 'win32': path = '/' + path.replace(':', '|').replace('\\', '/') return 'file://%s' % path def run_in(command, shell, cwd=None, env=None): if hasattr(shell, "keys"): shell = shell["exe"] if shell == 'cmd.exe': cmd_script = tempfile.NamedTemporaryFile(suffix='.bat', mode='wt', delete=False) cmd_script.write(command) cmd_script.close() cmd_bits = [shells[shell]["exe"]] + shells[shell]["shell_args"] + [cmd_script.name] try: p = subprocess.Popen(cmd_bits, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=cwd, env=env) stdout, stderr = p.communicate() finally: os.unlink(cmd_script.name) elif shell == 'powershell': raise NotImplementedError else: cmd_bits = ([shells[shell]["exe"]] + shells[shell]["shell_args"] + [translate_stream(command, shells[shell]["path_to"])]) p = subprocess.Popen(cmd_bits, stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, stderr = p.communicate() streams = [u"%s" % stream.decode('utf-8').replace('\r\n', '\n').rstrip("\n") for stream in (stdout, stderr)] return streams def path_identity(path): """Used as a dummy path converter where no conversion necessary""" return path def win_path_to_unix(path, root_prefix=""): """Convert a path or ;-separated string of paths into a unix representation Does not add cygdrive. If you need that, set root_prefix to "/cygdrive" """ path_re = '(?|]+[\/\\\\]+)*[^:*?"<>|;\/\\\\]+?(?![a-zA-Z]:))' # noqa def _translation(found_path): found = found_path.group(1).replace("\\", "/").replace(":", "").replace("//", "/") return root_prefix + "/" + found path = re.sub(path_re, _translation, path).replace(";/", ":/") return path def unix_path_to_win(path, root_prefix=""): """Convert a path or :-separated string of paths into a Windows representation Does not add cygdrive. If you need that, set root_prefix to "/cygdrive" """ if len(path) > 1 and (";" in path or (path[1] == ":" and path.count(":") == 1)): # already a windows path return path.replace("/", "\\") path_re = root_prefix + r'(/[a-zA-Z]/(?:(?![:\s]/)[^:*?"<>])*)' def _translation(found_path): group = found_path.group(0) return "{0}:{1}".format(group[len(root_prefix)+1], group[len(root_prefix)+2:].replace("/", "\\")) translation = re.sub(path_re, _translation, path) translation = re.sub(":([a-zA-Z]):\\\\", lambda match: ";" + match.group(0)[1] + ":\\", translation) return translation # curry cygwin functions def win_path_to_cygwin(path): return win_path_to_unix(path, "/cygdrive") def cygwin_path_to_win(path): return unix_path_to_win(path, "/cygdrive") def translate_stream(stream, translator): return "\n".join(translator(line) for line in stream.split("\n")) def human_bytes(n): """ Return the number of bytes n in more human readable form. """ if n < 1024: return '%d B' % n k = n/1024 if k < 1024: return '%d KB' % round(k) m = k/1024 if m < 1024: return '%.1f MB' % m g = m/1024 return '%.2f GB' % g # This is necessary for Windows, for linking the environment, and for printing the correct # activation instructions on Windows, depending on the shell type. It would be great to # get rid of it, but I don't know how to otherwise detect which shell is used to create # or install conda packages. def find_parent_shell(path=False, max_stack_depth=10): """return process name or path of parent. Default is to return only name of process.""" try: import psutil except ImportError: stderrlog.warn("No psutil available.\n" "To proceed, please conda install psutil") return None process = psutil.Process() pname = process.parent().name().lower() stack_depth = 0 while (any(proc in pname for proc in ["conda", "python", "py.test"]) and stack_depth < max_stack_depth): if process: process = process.parent() pname = process.parent().name().lower() stack_depth += 1 else: # fallback defaults to system default if sys.platform == 'win32': return 'cmd.exe' else: return 'bash' if path: return process.parent().exe() return process.parent().name() @memoized def get_yaml(): try: import ruamel_yaml as yaml except ImportError: try: import ruamel.yaml as yaml except ImportError: try: import yaml except ImportError: sys.exit("No yaml library available.\n" "To proceed, please conda install ruamel_yaml") return yaml def yaml_load(filehandle): yaml = get_yaml() try: return yaml.load(filehandle, Loader=yaml.RoundTripLoader, version="1.1") except AttributeError: return yaml.load(filehandle) def yaml_dump(string): yaml = get_yaml() try: return yaml.dump(string, Dumper=yaml.RoundTripDumper, block_seq_indent=2, default_flow_style=False, indent=4) except AttributeError: return yaml.dump(string, default_flow_style=False) # TODO: this should be done in a more extensible way # (like files for each shell, with some registration mechanism.) # defaults for unix shells. Note: missing "exe" entry, which should be set to # either an executable on PATH, or a full path to an executable for a shell unix_shell_base = dict( binpath="/bin/", # mind the trailing slash. echo="echo", env_script_suffix=".sh", nul='2>/dev/null', path_from=path_identity, path_to=path_identity, pathsep=":", printdefaultenv='echo $CONDA_DEFAULT_ENV', printpath="echo $PATH", printps1='echo $PS1', promptvar='PS1', sep="/", set_var='export ', shell_args=["-l", "-c"], shell_suffix="", slash_convert=("\\", "/"), source_setup="source", test_echo_extra="", var_format="${}", ) msys2_shell_base = dict( unix_shell_base, path_from=unix_path_to_win, path_to=win_path_to_unix, binpath="/Scripts/", # mind the trailing slash. ) if sys.platform == "win32": shells = { # "powershell.exe": dict( # echo="echo", # test_echo_extra=" .", # var_format="${var}", # binpath="/bin/", # mind the trailing slash. # source_setup="source", # nul='2>/dev/null', # set_var='export ', # shell_suffix=".ps", # env_script_suffix=".ps", # printps1='echo $PS1', # printdefaultenv='echo $CONDA_DEFAULT_ENV', # printpath="echo %PATH%", # exe="powershell.exe", # path_from=path_identity, # path_to=path_identity, # slash_convert = ("/", "\\"), # ), "cmd.exe": dict( echo="@echo", var_format="%{}%", binpath="\\Scripts\\", # mind the trailing slash. source_setup="call", test_echo_extra="", nul='1>NUL 2>&1', set_var='set ', shell_suffix=".bat", env_script_suffix=".bat", printps1="@echo %PROMPT%", promptvar="PROMPT", # parens mismatched intentionally. See http://stackoverflow.com/questions/20691060/how-do-i-echo-a-blank-empty-line-to-the-console-from-a-windows-batch-file # NOQA printdefaultenv='IF NOT "%CONDA_DEFAULT_ENV%" == "" (\n' 'echo %CONDA_DEFAULT_ENV% ) ELSE (\n' 'echo()', printpath="@echo %PATH%", exe="cmd.exe", shell_args=["/d", "/c"], path_from=path_identity, path_to=path_identity, slash_convert=("/", "\\"), sep="\\", pathsep=";", ), "cygwin": dict( unix_shell_base, exe="bash.exe", binpath="/Scripts/", # mind the trailing slash. path_from=cygwin_path_to_win, path_to=win_path_to_cygwin ), # bash is whichever bash is on PATH. If using Cygwin, you should use the cygwin # entry instead. The only major difference is that it handle's cygwin's /cygdrive # filesystem root. "bash.exe": dict( msys2_shell_base, exe="bash.exe", ), "bash": dict( msys2_shell_base, exe="bash", ), "sh.exe": dict( msys2_shell_base, exe="sh.exe", ), } else: shells = { "bash": dict( unix_shell_base, exe="bash", ), "zsh": dict( unix_shell_base, exe="zsh", ), "fish": dict( unix_shell_base, exe="fish", pathsep=" ", ), }