# Copyright (C) 2003-2021, Stefan Schwarzer # and ftputil contributors (see `doc/contributors.txt`) # See the file LICENSE for licensing terms. """ ftputil.path - simulate `os.path` for FTP servers """ import os import posixpath import stat import ftputil.error import ftputil.tool # The `_Path` class shouldn't be used directly by clients of the ftputil # library. __all__ = [] class _Path: """ Support class resembling `os.path`, accessible from the `FTPHost` object, e. g. as `FTPHost().path.abspath(path)`. Hint: substitute `os` with the `FTPHost` object. """ # `_Path` needs to provide all methods of `os.path`. # pylint: disable=too-many-instance-attributes def __init__(self, host): self._host = host self._encoding = host._encoding # Delegate these methods to the `posixpath` module because they don't # need file system access but work on the path strings (possibly # extracted from `PathLike` objects). # pylint: disable=invalid-name pp = posixpath self.basename = pp.basename self.commonprefix = pp.commonprefix self.dirname = pp.dirname self.isabs = pp.isabs self.join = pp.join self.normcase = pp.normcase self.normpath = pp.normpath self.split = pp.split self.splitdrive = pp.splitdrive self.splitext = pp.splitext def abspath(self, path): """ Return an absolute path. """ # Don't use `raise_for_empty_path` here since Python itself doesn't # raise an exception and just returns the current directory. original_path = path path = ftputil.tool.as_str_path(path, encoding=self._encoding) if not self.isabs(path): path = self.join(self._host.getcwd(), path) return ftputil.tool.same_string_type_as( os.fspath(original_path), self.normpath(path), self._encoding ) def exists(self, path): """ Return true if the path exists. """ if path in ["", b""]: return False try: lstat_result = self._host.lstat(path, _exception_for_missing_path=False) return lstat_result is not None except ftputil.error.RootDirError: return True def getmtime(self, path): """ Return the timestamp for the last modification for `path` as a float. This will raise `PermanentError` if the path doesn't exist, but maybe other exceptions depending on the state of the server (e. g. timeout). """ ftputil.tool.raise_for_empty_path(path) return self._host.stat(path).st_mtime def getsize(self, path): """ Return the size of the `path` item as an integer. This will raise `PermanentError` if the path doesn't exist, but maybe raise other exceptions depending on the state of the server (e. g. timeout). """ ftputil.tool.raise_for_empty_path(path) return self._host.stat(path).st_size # Check whether a path is a regular file/dir/link. For the first two cases # follow links (like in `os.path`). # # Implementation note: The previous implementations simply called `stat` or # `lstat` and returned `False` if they ended with raising a # `PermanentError`. That exception usually used to signal a missing path. # This approach has the problem, however, that exceptions caused by code # earlier in `lstat` are obscured by the exception handling in `isfile`, # `isdir` and `islink`. def _is_file_system_entity(self, path, dir_or_file): """ Return `True` if `path` represents the file system entity described by `dir_or_file` ("dir" or "file"). Return `False` if `path` isn't a directory or file, respectively or if `path` leads to an infinite chain of links. """ assert dir_or_file in ["dir", "file"] # Consider differences between directories and files. if dir_or_file == "dir": should_look_for_dir = True stat_function = stat.S_ISDIR else: should_look_for_dir = False stat_function = stat.S_ISREG # path = ftputil.tool.as_str_path(path, encoding=self._encoding) # Workaround if we can't go up from the current directory. The result # from `getcwd` should already be normalized. if self.normpath(path) == self._host.getcwd(): return should_look_for_dir try: stat_result = self._host.stat(path, _exception_for_missing_path=False) except ftputil.error.RecursiveLinksError: return False except ftputil.error.RootDirError: return should_look_for_dir else: if stat_result is None: # Non-existent path return False else: return stat_function(stat_result.st_mode) def isdir(self, path): """ Return true if the `path` exists and corresponds to a directory (no link). A non-existing path does _not_ cause a `PermanentError`, instead return `False`. """ if path in ["", b""]: return False return self._is_file_system_entity(path, "dir") def isfile(self, path): """ Return true if the `path` exists and corresponds to a regular file (no link). A non-existing path does _not_ cause a `PermanentError`, instead return `False`. """ if path in ["", b""]: return False return self._is_file_system_entity(path, "file") def islink(self, path): """ Return true if the `path` exists and is a link. A non-existing path does _not_ cause a `PermanentError`, instead return `False`. """ path = ftputil.tool.as_str_path(path, encoding=self._encoding) if path == "": return False try: lstat_result = self._host.lstat(path, _exception_for_missing_path=False) except ftputil.error.RootDirError: return False else: if lstat_result is None: # Non-existent path return False else: return stat.S_ISLNK(lstat_result.st_mode) def walk(self, top, func, arg): """ Directory tree walk with callback function. For each directory in the directory tree rooted at top (including top itself, but excluding "." and ".."), call func(arg, dirname, fnames). dirname is the name of the directory, and fnames a list of the names of the files and subdirectories in dirname (excluding "." and ".."). func may modify the fnames list in-place (e.g. via del or slice assignment), and walk will only recurse into the subdirectories whose names remain in fnames; this can be used to implement a filter, or to impose a specific order of visiting. No semantics are defined for, or required of, arg, beyond that arg is always passed to func. It can be used, e.g., to pass a filename pattern, or a mutable object designed to accumulate statistics. Passing None for arg is common. """ ftputil.tool.raise_for_empty_path(top, path_argument_name="top") top = ftputil.tool.as_str_path(top, encoding=self._encoding) # This code (and the above documentation) is taken from `posixpath.py`, # with slight modifications. try: names = self._host.listdir(top) except OSError: return func(arg, top, names) for name in names: name = self.join(top, name) try: stat_result = self._host.lstat(name) except OSError: continue if stat.S_ISDIR(stat_result[stat.ST_MODE]): self.walk(name, func, arg)