/******************************************************************************/ /* */ /* X r d B w m H a n d l e . c c */ /* */ /* (c) 2008 by the Board of Trustees of the Leland Stanford, Jr., University */ /* All Rights Reserved */ /* Produced by Andrew Hanushevsky for Stanford University under contract */ /* DE-AC02-76-SFO0515 with the Department of Energy */ /* */ /* This file is part of the XRootD software suite. */ /* */ /* XRootD is free software: you can redistribute it and/or modify it under */ /* the terms of the GNU Lesser General Public License as published by the */ /* Free Software Foundation, either version 3 of the License, or (at your */ /* option) any later version. */ /* */ /* XRootD is distributed in the hope that it will be useful, but WITHOUT */ /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */ /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */ /* License for more details. */ /* */ /* You should have received a copy of the GNU Lesser General Public License */ /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */ /* COPYING (GPL license). If not, see . */ /* */ /* The copyright holder's institutional names and contributor's names may not */ /* be used to endorse or promote products derived from this software without */ /* specific prior written permission of the institution or contributor. */ /******************************************************************************/ #include #include #include "XrdBwm/XrdBwmHandle.hh" #include "XrdBwm/XrdBwmLogger.hh" #include "XrdBwm/XrdBwmTrace.hh" #include "XrdSfs/XrdSfsInterface.hh" #include "XrdSys/XrdSysError.hh" #include "XrdSys/XrdSysPlatform.hh" #include "XProtocol/XProtocol.hh" /******************************************************************************/ /* S t a t i c O b j e c t s */ /******************************************************************************/ XrdBwmLogger *XrdBwmHandle::Logger = 0; XrdBwmPolicy *XrdBwmHandle::Policy = 0; XrdBwmHandle *XrdBwmHandle::Free = 0; unsigned int XrdBwmHandle::numQueued = 0; extern XrdSysError BwmEroute; /******************************************************************************/ /* L o c a l C l a s s e s */ /******************************************************************************/ class XrdBwmHandleCB : public XrdOucEICB, public XrdOucErrInfo { public: static XrdBwmHandleCB *Alloc() {XrdBwmHandleCB *mP; xMutex.Lock(); if (!(mP = Free)) mP = new XrdBwmHandleCB; else Free = mP->Next; xMutex.UnLock(); return mP; } void Done(int &Results, XrdOucErrInfo *eInfo, const char *Path=0) {xMutex.Lock(); Next = Free; Free = this; xMutex.UnLock(); } int Same(unsigned long long arg1, unsigned long long arg2) {return 0;} XrdBwmHandleCB() : Next(0) {} ~XrdBwmHandleCB() {} private: XrdBwmHandleCB *Next; static XrdSysMutex xMutex; static XrdBwmHandleCB *Free; }; XrdSysMutex XrdBwmHandleCB::xMutex; XrdBwmHandleCB *XrdBwmHandleCB::Free = 0; /******************************************************************************/ /* E x t e r n a l L i n k a g e s */ /******************************************************************************/ void *XrdBwmHanXeq(void *pp) { return XrdBwmHandle::Dispatch(); } /******************************************************************************/ /* c l a s s X r d B w m H a n d l e */ /******************************************************************************/ /******************************************************************************/ /* A c t i v a t e */ /******************************************************************************/ #define tident Parms.Tident int XrdBwmHandle::Activate(XrdOucErrInfo &einfo) { EPNAME("Activate"); XrdSysMutexHelper myHelper(hMutex); char *rBuff; int rSize, rc; // Check the status of this request. // if (Status != Idle) {if (Status == Scheduled) einfo.setErrInfo(kXR_inProgress, "Request already scheduled."); else einfo.setErrInfo(kXR_InvalidRequest, "Visa already issued."); return SFS_ERROR; } // Try to schedule this request. // qTime = time(0); rBuff = einfo.getMsgBuff(rSize); if (!(rc = Policy->Schedule(rBuff, rSize, Parms))) return SFS_ERROR; // If resource immediately available, let client run // if (rc > 0) {rHandle = rc; Status = Dispatched; rTime = time(0); ZTRACE(sched,"Run " < ") < ") <Parms.Tident = theUsr; // Always available hP->Parms.Lfn = strdup(thePath); hP->Parms.LclNode = strdup(LclNode); hP->Parms.RmtNode = strdup(RmtNode); hP->Parms.Direction = (Incomming ? XrdBwmPolicy::Incomming : XrdBwmPolicy::Outgoing); hP->Status = Idle; hP->qTime = 0; hP->rTime = 0; hP->xSize = 0; hP->xTime = 0; } // All done // return hP; } /******************************************************************************/ /* private A l l o c # 2 */ /******************************************************************************/ XrdBwmHandle *XrdBwmHandle::Alloc(XrdBwmHandle *old_hP) { static const int minAlloc = 4096/sizeof(XrdBwmHandle); static XrdSysMutex aMutex; XrdBwmHandle *hP; // No handle currently in the table. Get a new one off the free list or // return one to the free list. // aMutex.Lock(); if (old_hP) {old_hP->Next = Free; Free = old_hP; hP = 0;} else {if (!Free && (hP = new XrdBwmHandle[minAlloc])) {int i = minAlloc; while(i--) {hP->Next = Free; Free = hP; hP++;}} if ((hP = Free)) Free = hP->Next; } aMutex.UnLock(); return hP; } /******************************************************************************/ /* D i s p a t c h */ /******************************************************************************/ #define tident hP->Parms.Tident void *XrdBwmHandle::Dispatch() { EPNAME("Dispatch"); XrdBwmHandleCB *erP = XrdBwmHandleCB::Alloc(); XrdBwmHandle *hP; char *RespBuff; int RespSize, readyH, Result, Err; // Dispatch ready requests in an endless loop // do { // Setup buffer // RespBuff = erP->getMsgBuff(RespSize); *RespBuff = '\0'; erP->setErrCode(0); // Get next ready request and test if it ended with an error // if ((Err = (readyH = Policy->Dispatch(RespBuff, RespSize)) < 0)) readyH = -readyH; // Find the matching handle // if (!(hP = refHandle(readyH))) {sprintf(RespBuff, "%d", readyH); BwmEroute.Emsg("Dispatch", "Lost handle from", RespBuff); if (!Err) Policy->Done(readyH); continue; } // Lock the handle and make sure it can be dispatched // hP->hMutex.Lock(); if (hP->Status != Scheduled) {BwmEroute.Emsg("Dispatch", "ref to unscheduled handle", hP->Parms.Tident, hP->Parms.Lfn); if (!Err) Policy->Done(readyH); } else { hP->myEICB.Wait(); hP->rTime = time(0); erP->setErrCB((XrdOucEICB *)erP, hP->ErrCBarg); if (Err) {hP->Status = Idle; Result = SFS_ERROR;} else {hP->Status = Dispatched; erP->setErrCode(strlen(RespBuff)); Result = (*RespBuff ? SFS_DATA : SFS_OK); } ZTRACE(sched,(Err?"Err ":"Run ") <Parms.Lfn <<' ' <Parms.LclNode <<(hP->Parms.Direction == XrdBwmPolicy::Incomming ? " <- ":" -> ") <Parms.RmtNode); hP->ErrCB->Done(Result, (XrdOucErrInfo *)erP); erP = XrdBwmHandleCB::Alloc(); } hP->hMutex.UnLock(); } while(1); // Keep the compiler happy // return (void *)0; } #undef tident /******************************************************************************/ /* private r e f H a n d l e */ /******************************************************************************/ XrdBwmHandle *XrdBwmHandle::refHandle(int refID, XrdBwmHandle *hP) { static XrdSysMutex tMutex; static struct {XrdBwmHandle *First; XrdBwmHandle *Last; } hTab[256] = {{0,0}}; XrdBwmHandle *pP = 0; int i = refID % 256; // If we have a handle passed, add the handle to the table // tMutex.Lock(); if (hP) {hP->Next = 0; if (hTab[i].Last) {hTab[i].Last->Next = hP; hTab[i].Last = hP;} else {hTab[i].First = hTab[i].Last = hP; hP->Next = 0;} numQueued++; } else { hP = hTab[i].First; while(hP && hP->rHandle != refID) {pP = hP; hP = hP->Next;} if (hP) {if (pP) pP->Next = hP->Next; else hTab[i].First = hP->Next; if (hTab[i].Last == hP) hTab[i].Last = pP; numQueued--; } } tMutex.UnLock(); // All done. // return hP; } /******************************************************************************/ /* public R e t i r e */ /******************************************************************************/ // The handle must be locked upon entry! It is unlocked upon exit. void XrdBwmHandle::Retire() { XrdSysMutexHelper myHelper(hMutex); // Get the global lock as the links field can only be manipulated with it. // If not idle, cancel the resource. If scheduled, remove it from the table. // if (Status != Idle) {Policy->Done(rHandle); if (Status == Scheduled && !refHandle(rHandle, this)) BwmEroute.Emsg("Retire", "Lost handle to", Parms.Tident, Parms.Lfn); Status = Idle; rHandle = 0; } // If we have a logger, then log this event // if (Logger && qTime) {XrdBwmLogger::Info myInfo; myInfo.Tident = Parms.Tident; myInfo.Lfn = Parms.Lfn; myInfo.lclNode = Parms.LclNode; myInfo.rmtNode = Parms.RmtNode; myInfo.ATime = qTime; myInfo.BTime = rTime; myInfo.CTime = time(0); myInfo.Size = xSize; myInfo.ESec = xTime; myInfo.Flow = (Parms.Direction == XrdBwmPolicy::Incomming ? 'I':'O'); Policy->Status(myInfo.numqIn, myInfo.numqOut, myInfo.numqXeq); Logger->Event(myInfo); } // Free storage appendages and recycle handle // if (Parms.Lfn) {free(Parms.Lfn); Parms.Lfn = 0;} if (Parms.LclNode) {free(Parms.LclNode); Parms.LclNode = 0;} if (Parms.RmtNode) {free(Parms.RmtNode); Parms.RmtNode = 0;} Alloc(this); } /******************************************************************************/ /* s e t P o l i c y */ /******************************************************************************/ int XrdBwmHandle::setPolicy(XrdBwmPolicy *pP, XrdBwmLogger *lP) { pthread_t tid; int rc, startThread = (Policy == 0); // Set the policy and then start a thread to do dispatching if we have none // Policy = pP; if (startThread) if ((rc = XrdSysThread::Run(&tid, XrdBwmHanXeq, (void *)0, 0, "Handle Dispatcher"))) {BwmEroute.Emsg("setPolicy", rc, "create handle dispatch thread"); return 1; } // All done // Logger = lP; return 0; }