#!/usr/bin/env python3 # Copyright 2021 HTCondor Team, Computer Sciences Department, # University of Wisconsin-Madison, WI. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import os import sys import time import signal import logging import multiprocessing import pprint import queue from functools import partial from pathlib import Path import htcondor # adstash library in libexec sys.path.append(htcondor.param.get('libexec', '/usr/libexec/condor')) from adstash import history, utils, config _MAIN_PID = os.getpid() def signal_handler(signal_queue, signum, frame): """ Catch signals from main program and send useful messages back """ # Call the default signal handler for child processes try: if os.getpid() != _MAIN_PID: raise RuntimeError("Custom signal handler called from child process") except (NameError, RuntimeError): signal.signal(signum, signal.SIG_DFL) os.kill(os.getpid(), signum) return try: signame = signal.Signals(signum).name signal_msg = f"Got signal {signal.Signals(signum).name} ({signum}) at frame {frame}" except ValueError: signame = signum signal_msg = f"Got signal {signum} at frame {frame}" signal_queue.put({"signal": signum, "signame": signame, "signal_msg": signal_msg}, block=False) def stash_ads(args): """ Gather ads and stash them in Elasticsearch """ starttime = time.time() # Get all the schedd ads if args.schedd_history: schedd_ads = [] schedd_ads = utils.get_schedds(args) logging.info(f"There are {len(schedd_ads)} schedds to query") # Get all the startd ads if args.startd_history: startd_ads = [] startd_ads = utils.get_startds(args) logging.info(f"There are {len(startd_ads)} startds to query.") # Process ads with multiprocessing.Pool(processes=args.threads, maxtasksperchild=1) as pool: metadata = utils.collect_process_metadata() if args.schedd_history: history.process_histories( schedd_ads=schedd_ads, starttime=starttime, pool=pool, args=args, metadata=metadata, ) if args.startd_history: history.process_histories( startd_ads=startd_ads, starttime=starttime, pool=pool, args=args, metadata=metadata, ) processing_time = int(time.time() - starttime) return processing_time def main(): """ Set up condor_adstash and run main loop """ # get args args = config.get_config(sys.argv[1:]) # dry_run implies read_only args.read_only = args.read_only or args.dry_run # set up logging utils.set_up_logging(args) logging.warning( "******************************************************") logging.warning(f"** {sys.argv[0]} (CONDOR_ADSTASH) STARTING UP") logging.warning(f"** {Path(__file__).resolve()}") logging.warning(f"** Configuration: subsystem:{args.process_name} type:ADSTASH class:DAEMON") logging.warning(f"** {htcondor.version()}") logging.warning(f"** {htcondor.platform()}") logging.warning(f"** PID: {os.getpid()}") logging.warning( "******************************************************") # pretty print a list of args (but no passwords) if debugging safe_args = vars(args).copy() if "es_password" in safe_args: safe_args["es_password"] = "" logging.debug(f"Using config: {pprint.pformat(safe_args)}") # catch signals signal_queue = queue.Queue() signal.signal(signal.SIGHUP, partial(signal_handler, signal_queue)) signal.signal(signal.SIGTERM, partial(signal_handler, signal_queue)) signal.signal(signal.SIGINT, partial(signal_handler, signal_queue)) signal.signal(signal.SIGQUIT, partial(signal_handler, signal_queue)) # register with the condor_master unless running standalone if not args.standalone: htcondor.set_subsystem(args.process_name, htcondor.SubsystemType.Daemon) try: logging.info("Sending DC_SET_READY message to condor_master") htcondor.set_ready_state("Ready") except Exception: logging.warning("Could not send DC_SET_READY message to condor_master") else: logging.warning("Running in standalone mode, will exit after one loop") # main loop while True: logging.warning("Starting adstash loop") last_loop_time = stash_ads(args) logging.warning(f"Total processing time for this loop: {last_loop_time/60:.2f} mins") if args.standalone: break next_loop_time = max(args.sample_interval - last_loop_time, 20) logging.info(f"Next adstash loop scheduled in {next_loop_time} seconds") try: got_signal = signal_queue.get( block=True, timeout=next_loop_time ) except queue.Empty: continue # refresh config on SIGHUP, exit on other signals if got_signal["signal"] == signal.SIGHUP: logging.warning( f"{got_signal['signal_msg']}: Refreshing config and running next loop." ) args = config.get_config(sys.argv[1:]) else: logging.warning(f"{got_signal['signal_msg']}: Exiting.") break logging.warning(f"**** {sys.argv[0]} (CONDOR_ADSTASH) pid {os.getpid()} EXITING WITH STATUS 0") if __name__ == "__main__": main()