/******************************************************************************/ /* */ /* X r d F r m X f r Q u e u e . c c */ /* */ /* (c) 2010 by the Board of Trustees of the Leland Stanford, Jr., University */ /* All Rights Reserved */ /* Produced by Andrew Hanushevsky for Stanford University under contract */ /* DE-AC02-76-SFO0515 with the Department of Energy */ /* */ /* This file is part of the XRootD software suite. */ /* */ /* XRootD is free software: you can redistribute it and/or modify it under */ /* the terms of the GNU Lesser General Public License as published by the */ /* Free Software Foundation, either version 3 of the License, or (at your */ /* option) any later version. */ /* */ /* XRootD is distributed in the hope that it will be useful, but WITHOUT */ /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */ /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */ /* License for more details. */ /* */ /* You should have received a copy of the GNU Lesser General Public License */ /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */ /* COPYING (GPL license). If not, see . */ /* */ /* The copyright holder's institutional names and contributor's names may not */ /* be used to endorse or promote products derived from this software without */ /* specific prior written permission of the institution or contributor. */ /******************************************************************************/ #include #include #include #include #include #include #include #include #include #include "XrdFrc/XrdFrcReqFile.hh" #include "XrdFrc/XrdFrcTrace.hh" #include "XrdFrm/XrdFrmConfig.hh" #include "XrdFrm/XrdFrmXfrJob.hh" #include "XrdFrm/XrdFrmXfrQueue.hh" #include "XrdNet/XrdNetMsg.hh" #include "XrdOss/XrdOss.hh" #include "XrdOuc/XrdOucTList.hh" #include "XrdSys/XrdSysError.hh" #include "XrdSys/XrdSysFD.hh" #include "XrdSys/XrdSysTimer.hh" #include "XrdSys/XrdSysPlatform.hh" using namespace XrdFrc; using namespace XrdFrm; /******************************************************************************/ /* S t a t i c s */ /******************************************************************************/ XrdSysMutex XrdFrmXfrQueue::hMutex; XrdOucHash XrdFrmXfrQueue::hTab; XrdSysMutex XrdFrmXfrQueue::qMutex; XrdSysSemaphore XrdFrmXfrQueue::qReady(0); XrdFrmXfrQueue::theQueue XrdFrmXfrQueue::xfrQ[XrdFrcRequest::numQ]; /******************************************************************************/ /* Public: A d d */ /******************************************************************************/ int XrdFrmXfrQueue::Add(XrdFrcRequest *rP, XrdFrcReqFile *reqFQ, int qNum) { XrdFrmXfrJob *xP; struct stat buf; const char *xfrType = xfrName(*rP, qNum); char *Lfn, lclpath[MAXPATHLEN]; int Outgoing = (qNum & XrdFrcRequest::outQ); // Validate queue number // if (qNum < 0 || qNum >= XrdFrcRequest::numQ-1) {sprintf(lclpath, "%d", qNum); Say.Emsg("Queue", lclpath, " is an invalid queue; skipping", rP->LFN); if (reqFQ) reqFQ->Del(rP); return 0; } // First check if this request is active or pending. If it's an inbound request // then only the lfn matters regardless of source. For outgoing requests then // the lfn plus the target only matters. // Lfn = (Outgoing ? rP->LFN : (rP->LFN)+rP->LFO); hMutex.Lock(); if ((xP = hTab.Find(Lfn))) {if (rP->Options & (XrdFrcRequest::msgSucc | XrdFrcRequest::msgFail) && strcmp(xP->reqData.Notify, rP->Notify)) {XrdOucTList *tP = new XrdOucTList(rP->Notify, 0, xP->NoteList); xP->NoteList = tP; } hMutex.UnLock(); if (Config.Verbose || Trace.What & TRACE_Debug) {sprintf(lclpath, " in progress; %s skipped for ", xfrType); Say.Say(0, xP->Type, xP->reqData.LFN, lclpath, rP->User); } if (reqFQ) reqFQ->Del(rP); return 0; } hMutex.UnLock(); // Obtain the local name // if (!Config.LocalPath((rP->LFN)+rP->LFO, lclpath, sizeof(lclpath)-16)) {if (reqFQ) reqFQ->Del(rP); return Notify(rP, qNum, 1, "Unable to generate pfn"); } // Check if the file exists or not. For incomming requests, the file must not // exist. For outgoing requests the file must exist. // if (Config.Stat((rP->LFN)+rP->LFO, lclpath, &buf)) {if (Outgoing) {if (Config.Verbose || Trace.What & TRACE_Debug) Say.Say(0, xfrType,"skipped; ",lclpath," not resident."); if (reqFQ) reqFQ->Del(rP); return Notify(rP, qNum, 2, "file not resident"); } } else { if (!Outgoing) {if (Config.Verbose || Trace.What & TRACE_Debug) Say.Say(0, xfrType, "skipped; ", lclpath, " exists."); if (reqFQ) reqFQ->Del(rP); return Notify(rP, qNum, 0); } } // Obtain a queue slot, we may block until one is available // do {qMutex.Lock(); if ((xP = xfrQ[qNum].Free)) break; qMutex.UnLock(); xfrQ[qNum].Avail.Wait(); } while(!xP); xfrQ[qNum].Free = xP->Next; qMutex.UnLock(); // Initialize the slot // xP->Next = 0; xP->NoteList = 0; xP->reqFQ = reqFQ; xP->reqData = *rP; xP->reqFile = (Outgoing ? xP->reqData.LFN : (xP->reqData.LFN)+rP->LFO); strcpy(xP->PFN, lclpath); xP->pfnEnd = strlen(lclpath); xP->RetCode = 0; xP->qNum = qNum; xP->Act =*xfrType; xP->Type = xfrType+1; // Add this to the table of requests // hMutex.Lock(); hTab.Add(xP->reqFile, xP, 0, Hash_keep); hMutex.UnLock(); // Place request in the appropriate transfer queue // qMutex.Lock(); if (xfrQ[qNum].Last) {xfrQ[qNum].Last->Next = xP; xfrQ[qNum].Last = xP;} else xfrQ[qNum].Last = xfrQ[qNum].First = xP; qMutex.UnLock(); qReady.Post(); // All done // return 1; } /******************************************************************************/ /* Public: D o n e */ /******************************************************************************/ void XrdFrmXfrQueue::Done(XrdFrmXfrJob *xP, const char *Msg) { XrdOucTList *tP; // Send notifications to everyone that wants it that this job is done // do {Notify(&(xP->reqData), xP->qNum, xP->RetCode, Msg); if ((tP = xP->NoteList)) {strcpy(xP->reqData.Notify, tP->text); xP->NoteList = tP->next; delete tP; } } while(tP); // Remove this job from the queue file // if (xP->reqFQ) xP->reqFQ->Del(&(xP->reqData)); // Remove this job from the active table // hMutex.Lock(); hTab.Del(xP->reqFile); hMutex.UnLock(); // Place job element on the free queue // qMutex.Lock(); xP->Next = xfrQ[xP->qNum].Free; xfrQ[xP->qNum].Free = xP; xfrQ[xP->qNum].Avail.Post(); qMutex.UnLock(); } /******************************************************************************/ /* Public: G e t */ /******************************************************************************/ XrdFrmXfrJob *XrdFrmXfrQueue::Get() { XrdFrmXfrJob *xfrP; // Wait for an available job and return it // do {qReady.Wait();} while(!(xfrP = Pull())); return xfrP; } /******************************************************************************/ /* I n i t */ /******************************************************************************/ void *InitStop(void *parg) { XrdFrmXfrQueue::StopMon(parg); return (void *)0; } int XrdFrmXfrQueue::Init() { static const char *StopFN[] = {"STAGE", "MIGR", "COPYIN", "COPYOUT"}; static const char *StopQN[] = {"stage", "migr", "copyin", "copyout"}; XrdFrmXfrJob *xP; pthread_t tid; char StopFile[1024], *fnSfx; int n, qNum, retc; // Prepare to initialize the queues // strcpy(StopFile, Config.AdminPath); strcat(StopFile, "STOP"); fnSfx = StopFile + strlen(StopFile); // Initialize each queue // for (qNum= 0; qNum < XrdFrcRequest::numQ-1; qNum++) { // Initialize the stop file name and set the queue name and number // strcpy(fnSfx, StopFN[qNum]); xfrQ[qNum].File = strdup(StopFile); xfrQ[qNum].Name = StopQN[qNum]; xfrQ[qNum].qNum = qNum; // Start the stop file monitor thread for this queue // if ((retc = XrdSysThread::Run(&tid, InitStop, (void *)&xfrQ[qNum], XRDSYSTHREAD_BIND, "Stopfile monitor"))) {Say.Emsg("main", retc, "create stopfile thread"); return 0;} // Create twice as many free queue elements as we have xfr agents for the // queue. This prevents stalls when a particular queue is stopped but keeps // us from exceeding internal resources when we get flooded with requests. // n = Config.xfrMax*2; while(n--) {xP = new XrdFrmXfrJob; xP->Next = xfrQ[qNum].Free; xfrQ[qNum].Free = xP; xfrQ[qNum].Avail.Post(); } } // All done // return 1; } /******************************************************************************/ /* Private: P u l l */ /******************************************************************************/ XrdFrmXfrJob *XrdFrmXfrQueue::Pull() { static int ioX = 0, prevQ[2] = {0,0}; XrdFrmXfrJob *xfrP; int pikQ, theQ, Q1, Q2, nSel = 1; // Setup to pick a request equally multiplexing between all possible queues // qMutex.Lock(); do{ioX = (ioX + 1) & 1; if (ioX) {Q1 = XrdFrcRequest::migQ; Q2 = XrdFrcRequest::putQ; pikQ = 1;} else {Q1 = XrdFrcRequest::stgQ; Q2 = XrdFrcRequest::getQ; pikQ = 0;} // Check if we should avoid either queue because it is stopped // if (xfrQ[Q1].Stop || Stopped(Q1)) Q1 = XrdFrcRequest::nilQ; if (xfrQ[Q2].Stop || Stopped(Q2)) Q2 = XrdFrcRequest::nilQ; // Pick the oldest possible request // if (xfrQ[Q1].First && xfrQ[Q2].First) { if (xfrQ[Q1].First->reqData.addTOD < xfrQ[Q2].First->reqData.addTOD) theQ = Q1; else if (xfrQ[Q1].First->reqData.addTOD > xfrQ[Q2].First->reqData.addTOD) theQ = Q2; else theQ = (prevQ[pikQ] == Q1 ? Q2 : Q1); }else theQ = (xfrQ[Q1].First ? Q1 : Q2); // Dequeue the request (we may have an empty selectoin here) // if ((xfrP = xfrQ[theQ].First) && !(xfrQ[theQ].First = xfrP->Next)) xfrQ[theQ].Last = 0; } while(!xfrP && nSel--); // Return the job, if any // prevQ[pikQ] = theQ; qMutex.UnLock(); return xfrP; } /******************************************************************************/ /* Private: N o t i f y */ /******************************************************************************/ int XrdFrmXfrQueue::Notify(XrdFrcRequest *rP, int qNum, int rc, const char *msg) { static const char *isFile = "file:///"; static const int lnFile = 8; static const char *isUDP = "udp://"; static const int lnUDP = 6; static const char *qOpr[] = {"stage", "migr", "get", "put"}; char msgbuff[4096], *nP, *mP = rP->Notify; int n; // Check if message really needs to be sent // if ((!rc && !(rP->Options & XrdFrcRequest::msgSucc)) || ( rc && !(rP->Options & XrdFrcRequest::msgFail))) return 0; // Multiple destinations can be specified, each destination separated by a // carriable rturn. We don't screen out duplicates. // do{if ((nP = index(rP->Notify, '\r'))) *nP++ = '\0'; // Check for file destination // if (!strncmp(mP, isFile, lnFile)) {if (rc) n = sprintf(msgbuff, "%s %s %s %s\n", qOpr[qNum], (rc > 1 ? "ENOENT":"BAD"), rP->LFN, (msg ? msg:"?")); else n = sprintf(msgbuff, "stage OK %s\n", rP->LFN); Send2File(mP+lnFile, msgbuff, n); } // Check for udp destination // else if (!strncmp(mP, isUDP, lnUDP)) {char *txtP, *dstP = mP+lnUDP; if ((txtP = index(dstP, '/'))) *txtP++ = '\0'; else txtP = (char *)""; n = sprintf(msgbuff, "%s %s %s %s", (rc ? "unprep" : "ready"), rP->ID, txtP, rP->LFN); Send2UDP(dstP, msgbuff, n); } // Issue warning as we don't yet support mail or tcp notifications // else if (*mP != '-') Say.Emsg("Notify", "Unsupported notification path '", mP, "'."); } while((mP = nP)); // All done // return 0; } /******************************************************************************/ /* Private: S e n d 2 F i l e */ /******************************************************************************/ void XrdFrmXfrQueue::Send2File(char *Dest, char *Msg, int Mln) { EPNAME("Notify"); int FD; // Do some debugging // DEBUG("sending '" <Name); // Wait until someone needs to tell us to check for a stop file // while(1) {monQ->Alert.Wait(); Cnt = 0; while(!stat(monQ->File, &buf)) {if (!Cnt--) {Say.Emsg("StopMon", monQ->File, theMsg); Cnt = 12;} XrdSysTimer::Snooze(5); } qMutex.Lock(); monQ->Stop = 0; xP = monQ->First; while(xP) {qReady.Post(); xP = xP->Next;} qMutex.UnLock(); } } /******************************************************************************/ /* Private: S t o p p e d */ /******************************************************************************/ int XrdFrmXfrQueue::Stopped(int qNum) // Called with qMutex locked! { struct stat buf; // Check for stop file existence. If it exists and the queue has not been // stopped; stop it and alert the stop file monitor. // if (stat(xfrQ[qNum].File, &buf)) return 0; if (!xfrQ[qNum].Stop) {xfrQ[qNum].Stop = 1; xfrQ[qNum].Alert.Post();} return 1; } /******************************************************************************/ /* Private: x f r N a m e */ /******************************************************************************/ const char *XrdFrmXfrQueue::xfrName(XrdFrcRequest &reqData, int qNum) { // Return a human name for this transfer: // Migrate // Migr+rm // Staging // CopyIn // CopyOut // Copy+rm // switch(qNum) {case XrdFrcRequest::getQ: return "1CopyIn "; break; case XrdFrcRequest::migQ: return (reqData.Options & XrdFrcRequest::Purge ? "3Migr+rm ":"2Migrate "); break; case XrdFrcRequest::putQ: return (reqData.Options&XrdFrcRequest::Purge ? "5Copy+rm " : "4CopyOut "); break; case XrdFrcRequest::stgQ: return "6Staging "; break; default: break; } return "0Unknown "; }