# Copyright 2020 HTCondor Team, Computer Sciences Department, # University of Wisconsin-Madison, WI. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import atexit import enum import functools import logging import os import shlex import signal import subprocess import textwrap import time from pathlib import Path from typing import List, Mapping, Optional, Set import classad import htcondor __all__ = ["PersonalPool", "PersonalPoolState", "SetCondorConfig"] logger = logging.getLogger(__name__) IS_WINDOWS = os.name == "nt" AUTH_METHODS = ["FS" if not IS_WINDOWS else "NTSSPI", "PASSWORD", "IDTOKENS"] DEFAULT_CONFIG = { "LOCAL_CONFIG_FILE": "", "MASTER_ADDRESS_FILE": "$(LOG)/.master_address", "COLLECTOR_ADDRESS_FILE": "$(LOG)/.collector_address", "SCHEDD_ADDRESS_FILE": "$(LOG)/.schedd_address", "JOB_QUEUE_LOG": "$(SPOOL)/job_queue.log", # TUNING "UPDATE_INTERVAL": "2", "POLLING_INTERVAL": "2", "NEGOTIATOR_INTERVAL": "2", "STARTER_UPDATE_INTERVAL": "2", "STARTER_INITIAL_UPDATE_INTERVAL": "2", "NEGOTIATOR_CYCLE_DELAY": "2", # SECURITY # TODO: revisit security; it would be much better to not hand-roll this "SEC_DAEMON_AUTHENTICATION": "REQUIRED", "SEC_CLIENT_AUTHENTICATION": "REQUIRED", "SEC_DEFAULT_AUTHENTICATION_METHODS": ", ".join(AUTH_METHODS), "self": "$(USERNAME)@$(UID_DOMAIN) $(USERNAME)@$(IPV4_ADDRESS) $(USERNAME)@$(IPV6_ADDRESS) $(USERNAME)@$(FULL_HOSTNAME) $(USERNAME)@$(HOSTNAME)", "ALLOW_READ": "$(self)", "ALLOW_WRITE": "$(self)", "ALLOW_DAEMON": "$(self)", "ALLOW_ADMINISTRATOR": "$(self)", "ALLOW_ADVERTISE": "$(self)", "ALLOW_NEGOTIATOR": "$(self)", "ALLOW_READ_COLLECTOR": "$(ALLOW_READ)", "ALLOW_READ_STARTD": "$(ALLOW_READ)", "ALLOW_NEGOTIATOR_SCHEDD": "$(ALLOW_NEGOTIATOR)", "ALLOW_WRITE_COLLECTOR": "$(ALLOW_WRITE)", "ALLOW_WRITE_STARTD": "$(ALLOW_WRITE)", # SLOT CONFIG "NUM_SLOTS": "1", "NUM_SLOTS_TYPE_1": "1", "SLOT_TYPE_1": "100%", "SLOT_TYPE_1_PARTITIONABLE": "TRUE", } ROLES = ["Personal"] FEATURES = ["GPUs"] INHERIT = { "RELEASE_DIR", "LIB", "BIN", "SBIN", "INCLUDE", "LIBEXEC", "SHARE", "AUTH_SSL_SERVER_CAFILE", "AUTH_SSL_CLIENT_CAFILE", "AUTH_SSL_SERVER_CERTFILE", "AUTH_SSL_SERVER_KEYFILE", } INHERITED_PARAMS = {k: v for k, v in htcondor.param.items() if k in INHERIT} def _skip_if(*states): """Should only be applied to PersonalPool methods that return self.""" states = set(states) def decorator(func): @functools.wraps(func) def wrapper(self, *args, **kwargs): if self.state in states: logger.debug( "Skipping call to {} for {} because it is {}.".format( func.__name__, self, self.state ) ) return self return func(self, *args, **kwargs) return wrapper return decorator class PersonalPoolState(str, enum.Enum): """ An enumeration of the possible states that a :class:`PersonalPool` can be in. """ UNINITIALIZED = "UNINITIALIZED" INITIALIZED = "INITIALIZED" STARTING = "STARTING" READY = "READY" STOPPING = "STOPPING" STOPPED = "STOPPED" class PersonalPool: """ A :class:`PersonalPool` is responsible for managing the lifecycle of a personal HTCondor pool. It can be used to start and stop a personal pool, and can also "attach" to an existing personal pool that is already running. """ def __init__( self, local_dir: Optional[Path] = None, config: Mapping[str, str] = None, raw_config: Optional[str] = None, detach: bool = False, use_config: bool = True, ): """ Parameters ---------- local_dir The local directory for the personal HTCondor pool. All configuration and state for the personal pool will be stored in this directory. config HTCondor configuration parameters to inject, as a mapping of key-value pairs. raw_config Raw HTCondor configuration language to inject, as a string. detach If ``True``, the personal HTCondor pool will not be shut down when this object is destroyed (e.g., by stopping Python). Defaults to ``False``. use_config If ``True``, the environment variable ``CONDOR_CONFIG`` will be set during initialization, such that this personal pool appears to be the local HTCondor pool for all operations in this Python session, even ones that don't go through the :class:`PersonalPool` object. The personal pool will also be initialized. Defaults to ``True``. """ self._state = PersonalPoolState.UNINITIALIZED atexit.register(self._atexit) if local_dir is None: local_dir = Path.home() / ".condor" / "personal" self.local_dir = Path(local_dir).absolute() self._detach = detach self.execute_dir = self.local_dir / "execute" self.lock_dir = self.local_dir / "lock" self.log_dir = self.local_dir / "log" self.run_dir = self.local_dir / "run" self.spool_dir = self.local_dir / "spool" self.passwords_dir = self.local_dir / "passwords.d" self.tokens_dir = self.local_dir / "tokens.d" self.system_tokens_dir = self.local_dir / "system_tokens.d" self.config_file = self.local_dir / "condor_config" if config is None: config = {} self._config = {k: v if v is not None else "" for k, v in config.items()} self._raw_config = raw_config or "" self.condor_master = None if use_config: self.initialize() logger.debug("Setting CONDOR_CONFIG globally for {}".format(self)) self.use_config().set() @property def state(self): """The current :class:`PersonalPoolState` of the personal pool.""" return self._state @state.setter def state(self, state): old_state = self._state self._state = state logger.debug("State of {} changed from {} to {}".format(self, old_state, state)) def __repr__(self): shortest_path = min( ( str(self.local_dir), "~/" + str(_try_relative_to(self.local_dir, Path.home())), "./" + str(_try_relative_to(self.local_dir, Path.cwd())), ), key=len, ) return "{}(local_dir={}, state={})".format( type(self).__name__, shortest_path, self.state ) def use_config(self): """ Returns a :class:`SetCondorConfig` context manager that sets ``CONDOR_CONFIG`` to point to the configuration file for this personal pool. """ return SetCondorConfig(self.config_file) @property def collector(self): """ The :class:`htcondor.Collector` for the personal pool's collector. """ with self.use_config(): # This odd construction ensure that the Collector we return # doesn't just point to "the local collector" - that could be # overridden by changing CONDOR_CONFIG after the Collector # was initialized. Locating first keeps it stable. return htcondor.Collector( htcondor.Collector().locate(htcondor.DaemonTypes.Collector) ) @property def schedd(self): """ The :class:`htcondor.Schedd` for the personal pool's schedd. """ with self.use_config(): return htcondor.Schedd() def __enter__(self): self.use_config().set() return self.start() def __exit__(self, exc_type, exc_val, exc_tb): logger.debug("Stop triggered for {} by context exit.".format(self)) self.stop() self.use_config().unset() def __del__(self): logger.debug("Stop triggered for {} by object deletion.".format(self)) self.stop() def _atexit(self): logger.debug("Stop triggered for {} by interpreter shutdown.".format(self)) self.stop() @_skip_if(PersonalPoolState.READY) def start(self) -> "PersonalPool": """ Start the personal condor (bringing it to the ``READY`` state from either ``UNINITIALIZED`` or ``INITIALIZED``). Returns ------- self : PersonalPool This method returns ``self``. """ logger.info("Starting {}".format(self)) try: self.initialize() self._start_condor() self._wait_for_ready() except BaseException: logger.exception( "Encountered error during setup of {}, cleaning up!".format(self) ) self.stop() raise logger.info("Started {}".format(self)) return self @_skip_if( PersonalPoolState.INITIALIZED, PersonalPoolState.STARTING, PersonalPoolState.READY, ) def initialize(self, overwrite_config=True) -> "PersonalPool": """ Initialize the personal pool by creating its local directory and writing out configuration files. The contents of the local directory (except for the configuration file if ``overwrite_config=True``) will not be overridden. Parameters ---------- overwrite_config If ``True``, the existing configuration file will be overwritten with the configuration set up in the constructor. If ``False`` and there is an existing configuration file, an exception will be raised. Defaults to ``True``. Returns ------- self : PersonalPool This method returns ``self``. """ self._setup_local_dirs() self._write_config(overwrite=overwrite_config) self.state = PersonalPoolState.INITIALIZED return self def _setup_local_dirs(self): for dir in ( self.local_dir, self.execute_dir, self.lock_dir, self.log_dir, self.run_dir, self.spool_dir, self.passwords_dir, self.tokens_dir, self.system_tokens_dir, ): dir.mkdir(parents=True, exist_ok=True) self.passwords_dir.chmod(0o700) self.tokens_dir.chmod(0o700) self.system_tokens_dir.chmod(0o700) def _write_config(self, overwrite: bool = True) -> None: if not overwrite and self.config_file.exists(): raise FileExistsError( "Found existing config file; refusing to write config because overwrite={}.".format( overwrite ) ) self.config_file.parent.mkdir(parents=True, exist_ok=True) param_lines = [] param_lines += ["# INHERITED"] param_lines += ["{} = {}".format(k, v) for k, v in INHERITED_PARAMS.items()] param_lines += ["# ROLES"] param_lines += ["use ROLE: {}".format(role) for role in ROLES] param_lines += ["# FEATURES"] param_lines += ["use FEATURE: {}".format(feature) for feature in FEATURES] base_config = { "LOCAL_DIR": self.local_dir.as_posix(), "EXECUTE": self.execute_dir.as_posix(), "LOCK": self.lock_dir.as_posix(), "LOG": self.log_dir.as_posix(), "RUN": self.run_dir.as_posix(), "SPOOL": self.spool_dir.as_posix(), "SEC_PASSWORD_DIRECTORY": self.passwords_dir.as_posix(), "SEC_TOKEN_DIRECTORY": self.tokens_dir.as_posix(), "SEC_TOKEN_SYSTEM_DIRECTORY": self.system_tokens_dir.as_posix(), } param_lines += ["# BASE PARAMS"] param_lines += ["{} = {}".format(k, v) for k, v in base_config.items()] param_lines += ["# DEFAULT PARAMS"] param_lines += ["{} = {}".format(k, v) for k, v in DEFAULT_CONFIG.items()] param_lines += ["# CUSTOM PARAMS"] param_lines += ["{} = {}".format(k, v) for k, v in self._config.items()] param_lines += ["# RAW PARAMS"] param_lines += textwrap.dedent(self._raw_config).splitlines() params = "\n".join(param_lines + [""]) self.config_file.write_text(params) @_skip_if(PersonalPoolState.STARTING, PersonalPoolState.READY) def _start_condor(self): if self._is_ready(): raise Exception( "Cannot start a {t} in the same local_dir as an already-running {t}.".format( t=type(self).__name__ ) ) with SetCondorConfig(self.config_file): self.condor_master = subprocess.Popen( ["condor_master", "-f"], stdout=subprocess.PIPE, stderr=subprocess.PIPE ) logger.debug( "Started condor_master (pid {}) for {}".format( self.condor_master.pid, self ) ) self.state = PersonalPoolState.STARTING return self def _daemons(self) -> Set[str]: return set(self.get_config_val("DAEMON_LIST").split(" ")) @_skip_if(PersonalPoolState.READY) def _wait_for_ready(self, timeout=120): daemons = self._daemons() master_log_path = self._master_log logger.debug( "Starting up daemons for {}, waiting for: {}".format( self, " ".join(sorted(daemons)) ) ) start = time.time() while time.time() - start < timeout: time_to_give_up = int(timeout - (time.time() - start)) # if the master log does not exist yet, we can't use condor_who if not master_log_path.exists(): logger.debug( "MASTER_LOG at {} does not yet exist for {}, retrying in 1 seconds (giving up in {} seconds).".format( master_log_path, self, time_to_give_up ) ) time.sleep(1) continue who = self.run_command( shlex.split( "condor_who -wait:10 'IsReady && STARTD_State =?= \"Ready\"'" ), ) if who.stdout.strip() == "": logger.debug( "condor_who stdout was unexpectedly blank for {}, retrying in 1 second (giving up in {} seconds). condor_who stderr:\n{}".format( self, time_to_give_up, who.stderr ) ) time.sleep(1) continue who_ad = classad.parseOne(who.stdout) if ( who_ad.get("IsReady") and who_ad.get("STARTD_State") == "Ready" and all(who_ad.get(d) == "Alive" for d in daemons) ): self.state = PersonalPoolState.READY return self logger.debug( "{} is waiting for daemons to be ready (giving up in {} seconds)".format( self, time_to_give_up ) ) raise TimeoutError("Standup for {} failed".format(self)) def who(self) -> classad.ClassAd: """ Return the result of ``condor_who -quick``, as a :class:`classad.ClassAd`. If ``condor_who -quick`` fails, or the output can't be parsed into a sensible who ad, this method returns an empty ad. """ who = self.run_command(["condor_who", "-quick"]) try: parsed = classad.parseOne(who.stdout) # If there's no MASTER key in the parsed ad, it indicates # that we actually got the special post-shutdown message # from condor_who and should act like there's nothing there. if "MASTER" not in parsed: return classad.ClassAd() return parsed except Exception: return classad.ClassAd() def _condor_master_is_alive(self) -> bool: if self.condor_master is not None: return self.condor_master.poll() is None else: return bool(self.who()) def _master_pid(self) -> int: if self.condor_master is not None: return self.condor_master.pid else: return int(self.who()["MASTER_PID"]) def _daemon_pids(self) -> List[int]: return [int(v) for k, v in self.who() if k.endswith("_PID")] def _is_ready(self) -> bool: return self.who().get("IsReady", False) @_skip_if( PersonalPoolState.UNINITIALIZED, PersonalPoolState.INITIALIZED, PersonalPoolState.STOPPING, PersonalPoolState.STOPPED, ) def stop(self): """ Stop the personal condor, bringing it from the ``READY`` state to ``STOPPED``. Returns ------- self : PersonalPool This method returns ``self``. """ if self._detach: logger.debug("Will not stop {} because it is detached".format(self)) return self logger.info("Stopping {}".format(self)) self.state = PersonalPoolState.STOPPING self._condor_off() self._wait_for_master_to_terminate() self.state = PersonalPoolState.STOPPED logger.info("Stopped {}".format(self)) return self def _condor_off(self): if not self._condor_master_is_alive(): return off = self.run_command(["condor_off", "-daemon", "master"], timeout=30) if not off.returncode == 0: logger.error( "condor_off failed for {}, exit code: {}, stderr: {}".format( self, off.returncode, off.stderr ) ) self._terminate_condor_master() return logger.debug("condor_off succeeded for {}: {}".format(self, off.stdout)) def _wait_for_master_to_terminate(self, kill_after: int = 60, timeout: int = 120): logger.debug( "Waiting for condor_master (pid {}) for {} to terminate".format( self._master_pid(), self ) ) start = time.time() killed = False while True: if not self._condor_master_is_alive(): break elapsed = time.time() - start if not killed: logger.debug( "condor_master for {} has not terminated yet, will kill in {} seconds.".format( self, int(kill_after - elapsed) ) ) if elapsed > kill_after and not killed: self._kill_condor_system() killed = True if elapsed > timeout: raise TimeoutError( "Timed out while waiting for condor_master to terminate." ) time.sleep(1) logger.debug("condor_master for {} has terminated.".format(self)) def _terminate_condor_master(self): if not self._condor_master_is_alive(): return pid = self._master_pid() if self.condor_master is not None: self.condor_master.terminate() else: os.kill(pid, signal.SIGTERM) logger.debug( "Sent terminate signal to condor_master (pid {}) for {}".format(pid, self) ) def _kill_condor_system(self): if self._condor_master_is_alive(): return pids = self._daemon_pids() for pid in pids: os.kill(pid, signal.SIGKILL) logger.debug( "Sent kill signals to condor daemons (pids {}) for {}".format( ", ".join(map(str, pids)), self ) ) def run_command( self, args: List[str], stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines: bool = True, **kwargs ) -> subprocess.CompletedProcess: """ Execute a command in a subprocess against this personal pool, using :func:`subprocess.run` with good defaults for executing HTCondor commands. All of the keyword arguments of this function are passed directly to :func:`subprocess.run`. Parameters ---------- args The command to run, and its arguments, as a list of strings. kwargs All keyword arguments (including ``stdout``, ``stderr``, and ``universal_newlines``) are passed to :func:`subprocess.run`. Returns ------- completed_process : subprocess.CompletedProcess """ with self.use_config(): p = subprocess.run( list(map(str, args)), stdout=stdout, stderr=stderr, universal_newlines=universal_newlines, **kwargs, ) p.stdout = p.stdout.rstrip() p.stderr = p.stderr.rstrip() return p @property def _master_log(self) -> Path: return Path(self.get_config_val("MASTER_LOG")) def get_config_val(self, macro: str, default: Optional[str] = None) -> str: """ Get the value of a configuration macro. The value will be "evaluated", meaning that other configuration macros or functions inside it will be expanded. Parameters ---------- macro The configuration macro to look up the value for. default If not ``None``, and the config macro has no value, return this instead. If ``None``, a :class:`KeyError` will be raised instead. Returns ------- value : str The evaluated value of the configuration macro. """ with self.use_config(): try: return htcondor.param[macro] except KeyError: if default is not None: return default raise @classmethod def attach(cls, local_dir: Optional[Path] = None) -> "PersonalPool": """ Make a new :class:`PersonalPool` attached to an existing personal pool that is already running in ``local_dir``. Parameters ---------- local_dir The local directory for the existing personal pool. Returns ------- self : PersonalPool This method returns ``self``. """ pool = cls(local_dir=local_dir) if not pool._is_ready(): raise Exception( "There is not already a running HTCondor instance for {}.".format(pool) ) pool.state = PersonalPoolState.READY return pool def detach(self) -> "PersonalPool": """ Detach the personal pool (as in the constructor argument), and return ``self``. """ self._detach = True return self def _try_relative_to(path: Path, to: Path) -> Path: try: return path.relative_to(to) except ValueError: return path class SetCondorConfig: """ A context manager. Inside the block, the Condor config file is the one given to the constructor. After the block, it is reset to whatever it was before the block was entered. """ def __init__(self, config_file: Path): """ Parameters ---------- config_file The path to an HTCondor configuration file. """ self.config_file = Path(config_file) self.previous_value = None def set(self): """Set ``CONDOR_CONFIG`` and tell HTCondor to reconfigure.""" self.previous_value = os.environ.get("CONDOR_CONFIG", None) _set_env_var("CONDOR_CONFIG", str(self.config_file)) htcondor.reload_config() def unset(self): """Un-set ``CONDOR_CONFIG`` and tell HTCondor to reconfigure.""" if self.previous_value is not None: _set_env_var("CONDOR_CONFIG", self.previous_value) htcondor.reload_config() else: _unset_env_var("CONDOR_CONFIG") def __enter__(self): self.set() return self def __exit__(self, exc_type, exc_val, exc_tb): self.unset() def _set_env_var(key: str, value: str): os.environ[key] = value def _unset_env_var(key: str): value = os.environ.get(key, None) if value is not None: del os.environ[key]