#!/bin/sh # # Periodically read log files of PBS and put mark files # for job, which finished. # If log files are not available scan for finished (absent) jobs # in PBS and put mark files for job, which finished. # # usage: scan_pbs_job control_dir ... # ARC1 passes first the config file. if [ "$1" = "--config" ]; then shift; ARC_CONFIG=$1; shift; fi # first control_dir is used for storing own files if [ -z "$1" ] ; then exit 1 ; fi control_dir=$1 control_dirs= while [ $# -gt 0 ] ; do control_dirs="${control_dirs} \"$1\"" shift done joboption_lrms="pbs" lrms_options="pbs_bin_path pbs_log_path" queue_options="pbs_queue_node nodememory" # define paths and config parser basedir=`dirname $0` basedir=`cd $basedir > /dev/null && pwd` || exit $? . "${basedir}/lrms_common.sh" # include common scan functions . "${pkgdatadir}/scan_common.sh" || exit $? # run common init # * parse config # * load LRMS-specific env # * set common variables common_init # Define gm-kick location GMKICK=${pkglibexecdir}/gm-kick # Log system performance if [ ! -z "$perflogdir" ]; then perflog_common "$perflogdir" "$CONFIG_controldir" fi pbs_log_dir="${CONFIG_pbs_log_path:-/var/spool/pbs/server_logs}" my_id=`id -u` state_file=$control_dir/pbs_log_scan.`id -un` lines=`cat "$state_file" 2>/dev/null` ldt=`echo $lines | awk '{split($0,field," ");print field[1]}' ` lines=`echo $lines | awk '{split($0,field," ");print field[2]}'` lines_skip=$(( $lines + 0 )) ldate=$(( $ldt + 0 )) if [ -z "$lines_skip" ] ; then lines_skip='0' ; fi if [ -z "$ldate" ] ; then ldate='0' ; fi whole_line= find_by_local() { eval "set -- $control_dirs" for ctr_dir in "$@"; do find ${ctr_dir}/processing -name 'job.*.status' -print0 \ | sed 's/processing\/job\.\([^\.]*\)\.status/job.\1.local/g' \ | xargs -0 grep -F -l $whole_line "localid=$job_id" 2>/dev/null done \ | head -n 1 } find_by_grami() { eval "set -- $control_dirs" for ctr_dir in "$@"; do find ${ctr_dir}/processing -name 'job.*.status' -print0 \ | sed 's/processing\/job\.\([^\.]*\)\.status/job.\1.grami/g' \ | xargs -0 grep -F -l $whole_line "joboption_jobid=$job_id" 2>/dev/null done \ | sed 's/\.grami$/.local/' \ | head -n 1 } # set_job_vars takes a line from pbs logs and splits it, returning information # in pbs_date, pbs_code, pbs_server, pbs_job, job_id, job_message and rest_line set_job_vars() { pbs_date=$1 pbs_code=$2 pbs_server=$3 pbs_job=$4 job_id=$5 job_message=$6 rest_line=$7 } # # Main function for processing one PBS log. # Extracts log lines with code 0010 (job exited) and 0008 (job killed) # # TODO this should be split into smaller functions process_log_file () { eval "set -- $control_dirs" #we grep for finished jobs, then use sed to remove already processed lines #OBS: deleted jobs have a 0008 message with not much info in it. A 0010 # message may follow (or not) with full usage stats. By this time the # job has already been processed, so this info is ignored! #TODO: make log scanning more intelligent. exited_killed_jobs=`egrep '^[^;]*;(0010|16);[^;]*;Job;|^[^;]*;(00)?08;[^;]*;Job;[^;]*;Exit_status=|^[^;]*;(00)?08;[^;]*;Job;[^;]*;Job deleted' ${lname} | tail -n+$(( $lines_skip + 1 ))` #TODO should we add processed lines before jobs have actually been processed? What if the last job only has half a record? new_lines=`echo -n "$exited_killed_jobs" | wc -l` # new_lines set to 1 when string is empty, should have been 0 [ "x$exited_killed_jobs" = x ] && continue lines_processed=$(( $lines_skip + $new_lines )) if [ "$lines_processed" -lt '0' ] ; then lines_processed=0; fi echo "$cname $lines_processed"> $state_file exited_killed_jobs=`echo "$exited_killed_jobs" | sort -u` # force word splitting to happen only on newlines old_IFS=$IFS; IFS=' ' for job in $exited_killed_jobs; do # Split line into fields by forcing word splitting to happen on ";" IFS=";" set_job_vars $job IFS=$old_IFS # Try to extract exit code of PBS (note: if executable fails it's code goes to PBS) exit_code=`echo "$job_message" | sed -n 's/^.*Exit_status=\([-0-9]*\).*/\1/p'` # Check if job has suffix echo "$job_id" | grep -q -F . if [ ! $? = '0' ] ; then whole_line=-x else job_id=`echo "$job_id" | awk '{split($0,field,".");print field[1]"."field[2]}'` whole_line= fi # look for this id in job.ID.local, then in job.ID.grami name=`find_by_local` if [ -z "$name" ]; then name=`find_by_grami` if [ -z "$name" ]; then continue; fi fi if [ "$my_id" != '0' ] ; then if [ ! -O "$name" ] ; then continue ; fi fi uid=$(get_owner_uid "$name") [ -z "$uid" ] && { log "Failed to stat $name"; continue; } base_name=`echo "$name" 2>/dev/null | sed -n 's/\.local$//p'` if [ -z "${base_name}" ] ; then continue ; fi gridid=`echo "$base_name" 2>/dev/null | sed -n 's/.*\.\([^\.]*\)$/\1/'` # check if job already reported if [ -f "${base_name}.lrms_done" ] ; then continue ; fi statusfile=`echo "$name" 2>/dev/null | sed -n 's/job\.\([^\.]*\)\.local$/processing\/job.\1.status/p'` # more protection - check if arex thinks job is still running egrep 'INLRMS|SUBMIT|CANCELING' "$statusfile" >/dev/null 2>&1 if [ ! $? = '0' ] ; then continue ; fi # So far only PBS exit code is available # It would be nice to have exit code of main executable exitcode='' # get session directory of this job sessiondir=`grep -h '^sessiondir=' "${base_name}.local" | sed 's/^sessiondir=\(.*\)/\1/'` diagfile="${sessiondir}.diag" commentfile="${sessiondir}.comment" if [ -z "$sessiondir" ] ; then log "Failed to determine the path of the job's session directory" else # have chance to obtain exit code if [ -z "${RUNTIME_NODE_SEES_FRONTEND}" ] ; then # In case of non-NFS setup it may take some time till # diagnostics file is delivered. Wait for it max 2 minutes. # OBS: exitcode may never appear in the .diag file if the job was # killed. There will be a 2 minute delay for every such job! diag_tries=0 while [ "$diag_tries" -lt 20 ] ; do job_read_diag # uses $sessiondir, $uid if [ ! -z "$exitcode" ] ; then break ; fi sleep 10 diag_tries=$(( $diag_tries + 1 )) log "no exitcode in diag file $diagfile (try $diag_tries of 20)" done else job_read_diag # uses $sessiondir, $uid fi fi # Try to obtain message from PBS if any pbs_comment=$(do_as_uid "$uid" "tail -n 1 '$commentfile'") save_commentfile "$uid" "$commentfile" "${base_name}.errors" # Extract values from PBS walltime=`echo "$job_message" | sed -n 's/^.*resources_used.walltime=\(\([0-9]*:\)*[0-9][0-9]\).*/\1/p'` cputime=`echo "$job_message" | sed -n 's/^.*resources_used.cput=\(\([0-9]*:\)*[0-9][0-9]\).*/\1/p'` mem=`echo "$job_message" | sed -n 's/^.*resources_used.mem=\([0-9]*\)kb.*/\1/p'` vmem=`echo "$job_message" | sed -n 's/^.*resources_used.vmem=\([0-9]*\)kb.*/\1/p'` # Convert to utc and store as seconds date_to_utc_seconds "$pbs_date" if [ ! -z "$return_date_seconds" ]; then # Convert from seconds to YYYYMMDDHHMMSSZ seconds_to_mds_date "$return_date_seconds" endtime=$return_mds_date # Find out how many seconds the job executed interval_to_seconds "$walltime" if [ ! -z "$return_interval_seconds" ]; then # Convert from seconds to YYYYMMDDHHMMSSZ seconds_to_mds_date $(( $return_date_seconds - $return_interval_seconds )) starttime=$return_mds_date fi fi # Values to write to diag. These will override values already written. interval_to_seconds "$walltime" [ -n "$return_interval_seconds" ] && WallTime=$return_interval_seconds interval_to_seconds "$cputime" [ -n "$return_interval_seconds" ] && UserTime=$return_interval_seconds [ -n "$return_interval_seconds" ] && KernelTime=0 [ -n "$mem" ] && UsedMemory=$mem [ -n "$vmem" ] && TotalMemory=$vmem [ -n "$starttime" ] && LRMSStartTime=$starttime [ -n "$endtime" ] && LRMSEndTime=$endtime [ -n "$pbs_comment" ] && LRMSMessage=$pbs_comment [ -n "$exit_code" ] && LRMSExitcode=$exit_code job_write_diag if [ -z "$exitcode" ] ; then # No exit code of job means job was most probably killed if [ -z "$exit_code" ] ; then exit_code='-1'; fi if [ "$exit_code" = '0' ] ; then echo "Job $job_id failed but PBS have not noticed that" 1>&2 echo "-1 Job failed but PBS reported 0 exit code." > "${base_name}.lrms_done" elif [ -z "$pbs_comment" ] ; then echo "Job $job_id failed with PBS exit code $exit_code" 1>&2 echo "$exit_code Job was killed by PBS." > "${base_name}.lrms_done" else echo "Job $job_id failed with PBS exit code $exit_code" 1>&2 echo "$exit_code $pbs_comment" > "${base_name}.lrms_done" fi else if [ -z "$exit_code" ] ; then exit_code='-1'; fi if [ ! "$exitcode" = 0 ] ; then if [ "$exit_code" = '0' ] ; then exit_code='-1'; fi echo "Job $job_id failed with exit code $exitcode, PBS reported $exit_code." 1>&2 echo "$exit_code Job failed with exit code $exitcode." > "${base_name}.lrms_done" else if [ ! "$exit_code" = '0' ] ; then echo "Job finished properly but PBS reported $exit_code." 1>&2 if [ -z "$pbs_comment" ] ; then echo "$exit_code Job was killed by PBS." > "${base_name}.lrms_done" else echo "$exit_code $pbs_comment" > "${base_name}.lrms_done" fi else # echo "Job finished without errors." 1>&2 echo "0" > "${base_name}.lrms_done" fi fi fi # wake up GM ${GMKICK} -j "${gridid}" "${base_name}.local" done IFS=$old_IFS } readable_logs=no # Check $pbs_log_dir for readable files # if any are found, process them and update relevant information if [ ! -z "${pbs_log_dir}" ] ; then for cname in `ls -1 ${pbs_log_dir}/ 2>/dev/null | grep '^[0-9]*$'` ; do lname="${pbs_log_dir}/$cname" if [ ! -r "$lname" ] ; then continue ; fi readable_logs=yes if [ "$cname" -lt "$ldate" ] ; then continue elif [ "$cname" -gt "$ldate" ] ; then lines_skip=0 fi echo "Date: " $cname last_modified=`stat $lname | grep Modify` process_log_file done fi # main loop, stay here up to 60 seconds if log is still updated while # we are reading it. if [ "$readable_logs" = 'yes' ] ; then time_count=0 while true ; do new_modified=`stat $lname | grep Modify` if [ "$new_modified" != "$last_modified" ] ; then last_modified="$new_modified" lines=`cat "$state_file" 2>/dev/null` ldt=`echo $lines | awk '{split($0,field," ");print field[1]}' ` lines=`echo $lines | awk '{split($0,field," ");print field[2]}'` lines_skip=$(( $lines + 0 )) ldate=$(( $ldt + 0 )) process_log_file fi sleep 10 time_count=$(( $time_count + 10 )) if [ "$time_count" -ge 60 ] ; then break ; fi done exit 0 fi # If no PBS logs found try ordinary 'qstat' eval "set -- $control_dirs" # Get all running jobs pidslist=`mktemp "$TMPDIR/qstat.XXXXXX"` || if [ ! "$?" = '0' ] ; then # FS problems ? # TODO debug output here sleep 60 exit 1 fi ${PBS_BIN_PATH}/qstat -a 2>/dev/null 1>"$pidslist" if [ ! "$?" = '0' ] ; then rm -f "$pidslist" # PBS server down ? sleep 60 exit 1 fi exclude_completed () { awk '$10!="C"{print $0}' } pids=`cat "$pidslist" | grep '^[0-9][0-9]*\.' | exclude_completed | sed 's/^\([0-9][0-9]*\).*/\1/'` rm -f "$pidslist" # Go through directories for ctr_dir in "$@" ; do if [ ! -z "$perflogdir" ]; then start_ts=`date +%s.%N` fi # Obtain ids stored in job.*.local ids=`find ${ctr_dir}/processing -name 'job.*.status' -print0 \ | sed 's/processing\/job\.\([^\.]*\)\.status/job.\1.local/g' \ | xargs -0 grep -h "^localid=" 2>/dev/null | sed 's/^localid=\([0-9]*\).*/\1/'` if [ -z "$ids" ] ; then continue ; fi if [ ! -z "$perflogdir" ]; then stop_ts=`date +%s.%N` t=`awk "BEGIN { printf \"%.3f\", ${stop_ts}-${start_ts} }"` echo "[`date +%Y-%m-%d\ %T`] scan-pbs-job, ControlDirTraversal: $t" >> $perflogfile fi # compare them to running jobs and find missing bids= for id in $ids ; do found=`echo "$pids" | grep "^$id$"` if [ -z "$found" ] ; then bids="$bids $id" fi done if [ ! -z "$perflogdir" ]; then start_ts=`date +%s.%N` fi # go through missing ids for id in $bids ; do # find grid job corresponding to curent local id jobfile=`find ${ctr_dir}/processing -name 'job.*.status' -print0 \ | sed 's/processing\/job\.\([^\.]*\)\.status/job.\1.local/g' \ | xargs -0 grep -F -l "localid=$id." 2>/dev/null` if [ -z "$jobfile" ] ; then continue ; fi if [ "$my_id" != '0' ] ; then if [ ! -O "$jobfile" ] ; then continue ; fi fi uid=$(get_owner_uid "$jobfile") [ -z "$uid" ] && { log "Failed to stat $jobfile"; continue; } # extract grid id gridid=`basename "$jobfile" '.local' | sed 's/^job\.//'` donefile="${ctr_dir}/job.${gridid}.lrms_done" if [ -f "$donefile" ] ; then continue ; fi statusfile="${ctr_dir}/processing/job.${gridid}.status" if [ ! -f "$statusfile" ] ; then continue ; fi status=`cat "$statusfile"` if [ "$status" != "INLRMS" ] && [ "$status" != "CANCELING" ]; then continue ; fi # get session directory of this job session=`grep -h '^sessiondir=' "$jobfile" | sed 's/^sessiondir=\(.*\)/\1/'` if [ ! -z "$session" ] ; then # have chance to obtain exit code diagfile="${session}.diag" if [ ! -z "$session" ] ; then # have chance to obtain exit code exitcode=$(do_as_uid "$uid" "grep '^exitcode=' '$diagfile'" | sed 's/^exitcode=//') fi if [ ! -z "$exitcode" ] ; then # job finished and exit code is known save_commentfile "$uid" "${session}.comment" "${ctr_dir}/job.${gridid}.errors" echo "$exitcode Executable finished with exit code $exitcode" > "$donefile" ${GMKICK} -j "$gridid" "$jobfile" echo "Job $gridid finished with exit code $exitcode" continue fi fi # job has probaly finished and exit code is not known exitcode='-1' countfile="${ctr_dir}/job.${gridid}.lrms_job" counter=0 if [ -f "$countfile" ] ; then counter=`cat "$countfile"` counter=$(( $counter + 1 )) fi if [ "$counter" -gt 5 ] ; then rm -f "$countfile" save_commentfile "$uid" "${session}.comment" "${ctr_dir}/job.${gridid}.errors" echo "$exitcode Job was lost with unknown exit code" > "$donefile" ${GMKICK} -j "$gridid" "$jobfile" echo "Job $gridid finished with unknown exit code" else echo "$counter" > "$countfile" fi done if [ ! -z "$perflogdir" ]; then stop_ts=`date +%s.%N` t=`awk "BEGIN { printf \"%.3f\", ${stop_ts}-${start_ts} }"` echo "[`date +%Y-%m-%d\ %T`] scan-pbs-job, JobProcessing: $t" >> $perflogfile fi # go through existing ids for id in $pids ; do # find grid job corresponding to curent local id jobfile=`find ${ctr_dir} -name 'job.*.local' -print0 | xargs -0 grep -F -l "localid=$id." 2>/dev/null` if [ -z "$jobfile" ] ; then continue ; fi gridid=`basename "$jobfile" '.local' | sed 's/^job\.//'` countfile="${ctr_dir}/job.${gridid}.lrms_job" # reset failure counter rm -f "$countfile" done done sleep 60 exit 0