/******************************************************************************/ /* */ /* X r d X r o o t d J o b . c c */ /* */ /* (c) 2006 by the Board of Trustees of the Leland Stanford, Jr., University */ /* All Rights Reserved */ /* Produced by Andrew Hanushevsky for Stanford University under contract */ /* DE-AC02-76-SFO0515 with the Department of Energy */ /* */ /* This file is part of the XRootD software suite. */ /* */ /* XRootD is free software: you can redistribute it and/or modify it under */ /* the terms of the GNU Lesser General Public License as published by the */ /* Free Software Foundation, either version 3 of the License, or (at your */ /* option) any later version. */ /* */ /* XRootD is distributed in the hope that it will be useful, but WITHOUT */ /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */ /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */ /* License for more details. */ /* */ /* You should have received a copy of the GNU Lesser General Public License */ /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */ /* COPYING (GPL license). If not, see . */ /* */ /* The copyright holder's institutional names and contributor's names may not */ /* be used to endorse or promote products derived from this software without */ /* specific prior written permission of the institution or contributor. */ /******************************************************************************/ #include #include #include #include #include "Xrd/XrdLink.hh" #include "Xrd/XrdScheduler.hh" #include "XrdOuc/XrdOucProg.hh" #include "XrdOuc/XrdOucStream.hh" #include "XrdSys/XrdSysPlatform.hh" #include "XrdXrootd/XrdXrootdJob.hh" #include "XrdXrootd/XrdXrootdResponse.hh" #include "XrdXrootd/XrdXrootdTrace.hh" #include "XProtocol/XProtocol.hh" #include "XProtocol/XPtypes.hh" /******************************************************************************/ /* L o c a l C l a s s e s */ /******************************************************************************/ class XrdXrootdJob2Do : public XrdJob { public: friend class XrdXrootdJob; void DoIt(); enum JobStatus {Job_Active, Job_Cancel, Job_Done, Job_Waiting}; JobStatus Status; // Job Status XrdXrootdJob2Do(XrdXrootdJob *job, int jnum, const char **args, XrdXrootdResponse *Resp, int opts); ~XrdXrootdJob2Do(); private: int addClient(XrdXrootdResponse *rp, int opts); void delClient(XrdXrootdResponse *rp); XrdOucTList *lstClient(void); int verClient(int dodel=0); void Redrive(void); void sendResult(char *lp, int caned=0, int erc=0); static const int maxClients = 8; struct {XrdLink *Link; unsigned int Inst; kXR_char streamid[2]; char isSync; } Client[maxClients]; int numClients; XrdOucStream jobStream; // -> Stream for job I/O XrdXrootdJob *theJob; // -> Job description char *theArgs[5]; // -> Program arguments (see XrdOucProg) char *theResult; // -> The result int JobNum; // Job Number int JobRC; // Job kXR_ type return code char JobMark; char doRedrive; }; /******************************************************************************/ /* G l o b a l F u n c t i o n s */ /******************************************************************************/ extern XrdOucTrace *XrdXrootdTrace; int XrdXrootdJobWaiting(XrdXrootdJob2Do *item, void *arg) { return (item->Status == XrdXrootdJob2Do::Job_Waiting); } /******************************************************************************/ /* C l a s s X r d X r o o t d J o b 2 D o */ /******************************************************************************/ /******************************************************************************/ /* C o n s t r u c t o r */ /******************************************************************************/ XrdXrootdJob2Do::XrdXrootdJob2Do(XrdXrootdJob *job, int jnum, const char **args, XrdXrootdResponse *resp, int opts) : XrdJob(job->JobName) { int i; for (i = 0; i < 5 && args[i]; i++) theArgs[i] = strdup(args[i]); for ( ; i < 5; i++) theArgs[i] = (char *)0; theJob = job; JobRC = 0; JobNum = jnum; JobMark = 0; numClients = 0; theResult = 0; doRedrive = 0; Status = Job_Waiting; addClient(resp, opts); } /******************************************************************************/ /* D e s t r u c t o r */ /******************************************************************************/ XrdXrootdJob2Do::~XrdXrootdJob2Do() { int i; for (i = 0; i < numClients; i++) if (!Client[i].isSync) {sendResult(0, 1); break;} for (i = 0; i < 5; i++) if (theArgs[i]) free(theArgs[i]); } /******************************************************************************/ /* D o I t */ /******************************************************************************/ void XrdXrootdJob2Do::DoIt() { XrdXrootdJob2Do *jp = 0; char *lp = 0; int i, rc = 0; // Obtain a lock to prevent status changes // theJob->myMutex.Lock(); // While we were waiting to run we may have been cancelled. If we were not then // perform the actual function and get the result and send to any async clients // if (Status != Job_Cancel) {if ((rc = theJob->theProg->Run(&jobStream, theArgs[1], theArgs[2], theArgs[3], theArgs[4]))) {Status = Job_Cancel; lp = jobStream.GetLine(); } else {theJob->myMutex.UnLock(); lp = jobStream.GetLine(); rc = theJob->theProg->RunDone(jobStream); theJob->myMutex.Lock(); if ((rc && rc != -EPIPE) || (rc == -EPIPE && (!lp || !(*lp)))) Status = Job_Cancel; else if (Status != Job_Cancel) {Status = Job_Done; for (i = 0; i < numClients; i++) if (!Client[i].isSync) {sendResult(lp); break;} } } } // If the number of jobs > than the max allowed, then redrive a waiting job // if in fact we represent a legitimate job slot (this could a phantom slot // due to ourselves being cancelled. // if (doRedrive) {if (theJob->numJobs > theJob->maxJobs) Redrive(); theJob->numJobs--; } // If there are no polling clients left or we have been cancelled, then we // will delete ourselves and, if cancelled, send a notofication to everyone // if (Status != Job_Cancel && numClients) theResult = lp; else {if (Status == Job_Cancel) sendResult(lp, (rc ? -1 : 1), rc); jp = theJob->JobTable.Remove(JobNum); } // At this point we may need to delete ourselves. If so, jp will not be zero. // This must be the last action in this method. // theJob->myMutex.UnLock(); if (jp) delete jp; } /******************************************************************************/ /* P r i v a t e M e t h o d s */ /******************************************************************************/ /******************************************************************************/ /* a d d C l i e n t */ /******************************************************************************/ int XrdXrootdJob2Do::addClient(XrdXrootdResponse *rp, int opts) { XrdLink *lp = rp->theLink(); unsigned int Inst = lp->Inst(); int i; // Remove useless clients // if (numClients >= maxClients) verClient(); // See if we are already here // for (i = 0; i < numClients; i++) if (lp == Client[i].Link && Inst == Client[i].Inst) return 0; // Add the client if we can // if (numClients >= maxClients) return -1; Client[numClients].Link = lp; Client[numClients].Inst = Inst; if (opts & JOB_Sync) Client[numClients].isSync = 1; else {rp->StreamID(Client[numClients].streamid); Client[numClients].isSync = 0; } numClients++; JobMark = 0; return 1; } /******************************************************************************/ /* d e l C l i e n t */ /******************************************************************************/ void XrdXrootdJob2Do::delClient(XrdXrootdResponse *rp) { XrdLink *lp = rp->theLink(); unsigned int Inst = lp->Inst(); int i, j; // See if we are already here // for (i = 0; i < numClients; i++) if (lp == Client[i].Link && Inst == Client[i].Inst) {for (j = i+1; j < numClients; j++) Client[i++] = Client[j]; numClients--; break; } } /******************************************************************************/ /* l s t C l i e n t */ /******************************************************************************/ // Warning! The size of buff is large enough for the default number of clients // per job element. // XrdOucTList *XrdXrootdJob2Do::lstClient() { char State, buff[4096], *bp = buff; int bsz, i, k; // Get the state pf the job element // switch(Status) {case Job_Active: State = 'a'; break; case Job_Cancel: State = 'c'; break; case Job_Done: State = 'd'; break; case Job_Waiting: State = 'w'; break; default: State = 'u'; break; }; // Insert the header (reserve 8 characters for the trailer) // bp = buff + sprintf(buff, "%c", State); bsz = sizeof(buff) - (bp - buff) - 8; // Remove all clients from a job whose network connection is no longer valid // if (!numClients) bp++; else for (i = 0; i < numClients; i++) if (Client[i].Link && Client[i].Link->isInstance(Client[i].Inst)) {if ((k = strlcpy(bp, Client[i].Link->ID, bsz)) >= bsz || (bsz -= k) < 1) {bp++; break;} bp += k; *bp = ' '; bp++; bsz--; } // Insert trailer // if (*(bp-1) == ' ') bp--; strcpy(bp, ""); // Return the text // return new XrdOucTList(buff, bp-buff+7); } /******************************************************************************/ /* v e r C l i e n t */ /******************************************************************************/ int XrdXrootdJob2Do::verClient(int dodel) { int i, j, k; // Remove all clients from a job whose network connection is no longer valid // for (i = 0; i < numClients; i++) if (!Client[i].Link->isInstance(Client[i].Inst)) {k = i; for (j = i+1; j < numClients && j < maxClients; j++,k++) Client[k] = Client[j]; numClients--; i--; } // If no more clients, delete ourselves if safe to do so (caller has lock) // if (!numClients && dodel) {XrdXrootdJob2Do *jp = theJob->JobTable.Remove(JobNum); if (jp->Status == XrdXrootdJob2Do::Job_Waiting) theJob->numJobs--; delete jp; return 0; } return numClients; } /******************************************************************************/ /* R e d r i v e */ /******************************************************************************/ void XrdXrootdJob2Do::Redrive() { XrdXrootdJob2Do *jp; int Start = 0; // Find the first waiting job // while ((jp = theJob->JobTable.Apply(XrdXrootdJobWaiting, (void *)0, Start))) if (jp->verClient(jp->JobMark > 0)) break; else Start = jp->JobNum+1; // Schedule this job if we really have one here // if (jp) {jp->Status = Job_Active; jp->doRedrive = 1; theJob->Sched->Schedule((XrdJob *)jp); } } /******************************************************************************/ /* s e n d R e s u l t */ /******************************************************************************/ void XrdXrootdJob2Do::sendResult(char *lp, int caned, int jrc) { static const char *TraceID = "sendResult"; static const kXR_int32 Xcan = static_cast(htonl(kXR_Cancelled)); XrdXrootdReqID ReqID; struct iovec jobVec[6]; XResponseType jobStat; const char *trc, *tre; kXR_int32 erc; int j, i, dlen = 0, n = 1; // Format the message to be sent // if (!caned && lp) {jobStat = kXR_ok; trc = "ok"; if (theArgs[0]) { jobVec[n].iov_base = theArgs[0]; // 1 dlen = jobVec[n].iov_len = strlen(theArgs[0]); n++; jobVec[n].iov_base = (char *)" "; // 2 dlen += jobVec[n].iov_len = 1; n++; } } else { jobStat = kXR_error; trc = "error"; if (caned > 0) {erc = Xcan; lp = (char *)"Cancelled by admin.";} else {erc = (jrc ? XProtocol::mapError(jrc) : kXR_ServerError); erc = static_cast(htonl(erc)); if (!lp || !*lp) lp = (char *)"Program failed."; } jobVec[n].iov_base = (char *)&erc; dlen = jobVec[n].iov_len = sizeof(erc); n++; // 3 } jobVec[n].iov_base = lp; // 4 dlen += jobVec[n].iov_len = strlen(lp)+1; n++; // Send the response to each client waiting for it // j = 0; for (i = 0; i < numClients; i++) {if (!Client[i].isSync) {ReqID.setID(Client[i].streamid, Client[i].Link->FDnum(), Client[i].Link->Inst()); tre = (XrdXrootdResponse::Send(ReqID, jobStat, jobVec, n, dlen) < 0 ? "skipped" : "sent"); TRACE(RSP, tre <<" async " <ID); } else if (i != j) Client[j++] = Client[i]; } numClients = j; } /******************************************************************************/ /* C l a s s X r d X r o o t d J o b */ /******************************************************************************/ /******************************************************************************/ /* C o n s t r u c t o r */ /******************************************************************************/ XrdXrootdJob::XrdXrootdJob(XrdScheduler *schp, XrdOucProg *pgm, const char *jname, int maxjobs) : XrdJob("Job Scheduler"), JobTable(maxjobs*3) { // Initialize the base member here // Sched = schp; theProg = pgm; JobName = strdup(jname); maxJobs = maxjobs; numJobs = 0; // Schedule ourselves to run 15 minutes from now // schp->Schedule((XrdJob *)this, time(0) + (reScan)); } /******************************************************************************/ /* D e s t r u c t o r */ /******************************************************************************/ // Note! There is no reliable way to delete this object because various // unsynchronized threads may be pending at various break points. Fortunately, // there really is no need to ever delete an object of this kind. XrdXrootdJob::~XrdXrootdJob() { if (JobName) free(JobName); myMutex.Lock(); Sched->Cancel((XrdJob *)this); myMutex.UnLock(); } /******************************************************************************/ /* C a n c e l */ /******************************************************************************/ int XrdXrootdJob::Cancel(const char *jkey, XrdXrootdResponse *resp) { XrdXrootdJob2Do *jp = 0; int i, jNum, jNext = 0, numcaned = 0; // Lock our data // myMutex.Lock(); // Cancel a specific job if a key was passed // if (jkey) {if ((jp = JobTable.Find(jkey))) {numcaned = 1; if (resp) {jp->delClient(resp); if (!jp->numClients) CleanUp(jp); } else CleanUp(jp); } myMutex.UnLock(); return numcaned; } // Delete multiple jobs // while((jNum = JobTable.Next(jNext)) >= 0) {jp = JobTable.Item(jNum); if (resp) {i = jp->numClients; jp->delClient(resp); if (i != jp->numClients) numcaned++; if (!jp->numClients) CleanUp(jp); } else { CleanUp(jp); numcaned++; } } // All done // myMutex.UnLock(); return numcaned; } /******************************************************************************/ /* D o I t */ /******************************************************************************/ void XrdXrootdJob::DoIt() { int jNum, jNext = 0; XrdXrootdJob2Do *jp; // Scan through all of the jobs looking for disconnected clients // while((jNum = JobTable.Next(jNext)) >= 0) {myMutex.Lock(); if ((jp = JobTable.Item(jNum))) {if (jp->JobMark) {if (!jp->verClient()) CleanUp(jp);} else jp->JobMark = 1; } myMutex.UnLock(); } // Schedule ourselves to run 15 minutes from now // Sched->Schedule((XrdJob *)this, time(0) + (reScan)); } /******************************************************************************/ /* L i s t */ /******************************************************************************/ // Output: %jobkey%status%clientid ... .... // XrdOucTList *XrdXrootdJob::List() { char *jkey, buff[1024]; int tlen, jNum, jNext = 0; XrdXrootdJob2Do *jp; XrdOucTList *tF = 0, *tL = 0, *tp; // Scan through all of the jobs listing each, in turn // while((jNum = JobTable.Next(jNext)) >= 0) {myMutex.Lock(); if ((jp = JobTable.Item(jNum, &jkey)) && (tp = jp->lstClient())) {tlen = sprintf(buff, "%s", JobName, jkey); if (tL) tL->next = new XrdOucTList(buff, tlen, tp); else tF = new XrdOucTList(buff, tlen, tp); tL = tp->next = new XrdOucTList("", 6); } myMutex.UnLock(); } // Return the whole schmear // return tF; } /******************************************************************************/ /* S c h e d u l e */ /******************************************************************************/ int XrdXrootdJob::Schedule(const char *jkey, const char **args, XrdXrootdResponse *resp, int Opts) { XrdXrootdJob2Do *jp; const char *msg = "Job resources currently not available."; int jobNum, rc, isSync = Opts & JOB_Sync; // Make sure we have a target // if (!jkey || !(*jkey)) return resp->Send(kXR_ArgMissing, "Job target not specified."); // First find if this is a duplicate or create a new one // myMutex.Lock(); if (!(Opts & JOB_Unique) && jkey && (jp = JobTable.Find(jkey))) {if (jp->Status == XrdXrootdJob2Do::Job_Done) {rc = sendResult(resp, args[0], jp); myMutex.UnLock(); return rc; } if (jp->addClient(resp, Opts) < 0) isSync = 1; else msg = "Job scheduled."; } else { if ((jobNum = JobTable.Alloc()) < 0) isSync = 1; else {if ((jp = new XrdXrootdJob2Do(this, jobNum, args, resp, Opts))) {JobTable.Insert(jp, jkey, jobNum); if (numJobs < maxJobs) {Sched->Schedule((XrdJob *)jp); jp->Status = XrdXrootdJob2Do::Job_Active; jp->doRedrive = 1; } numJobs++; msg = "Job Scheduled"; } } } // Tell the client to wait // if (isSync) rc = resp->Send(kXR_wait, 30, msg); else rc = resp->Send(kXR_waitresp, 600, "Job scheduled."); myMutex.UnLock(); return rc; } /******************************************************************************/ /* P r i v a t e M e t h o d s */ /******************************************************************************/ /******************************************************************************/ /* C l e a n U p */ /******************************************************************************/ void XrdXrootdJob::CleanUp(XrdXrootdJob2Do *jp) { int theStatus = jp->Status; // Now we have to be careful. If the job is waiting or completed schedule // it for cancellation. If it's active then kill the associated process. The // thread waiting for the result will see the cancellation. Otherwise, it // already has been cancelled and is in the scheduled queue. // jp->Status = XrdXrootdJob2Do::Job_Cancel; if (theStatus == XrdXrootdJob2Do::Job_Waiting || theStatus == XrdXrootdJob2Do::Job_Done) Sched->Schedule((XrdJob *)jp); else{if (theStatus == XrdXrootdJob2Do::Job_Active) jp->jobStream.Drain();} if (theStatus == XrdXrootdJob2Do::Job_Waiting) numJobs--; } /******************************************************************************/ /* s e n d R e s u l t */ /******************************************************************************/ int XrdXrootdJob::sendResult(XrdXrootdResponse *resp, const char *rpfx, XrdXrootdJob2Do *job) { struct iovec jobResp[4]; int dlen, i, rc; // Send an error result if no result is present // if (!(job->theResult)) rc = resp->Send(kXR_ServerError,"Program failed"); else {if (!rpfx) {dlen = 0; i = 1;} else { jobResp[1].iov_base = (char *)rpfx; dlen = jobResp[1].iov_len = strlen(rpfx); jobResp[2].iov_base = (char *)" "; dlen += jobResp[2].iov_len = 1; i = 3; } jobResp[i].iov_base = job->theResult; dlen += jobResp[i].iov_len = strlen(job->theResult); rc = resp->Send(jobResp, i+1, dlen); } // Remove the client from the job. Check if clean-up is required // job->delClient(resp); if (!job->numClients) CleanUp(job); // All done // return rc; }