/******************************************************************************/ /* */ /* X r d C m s R e s p . c c */ /* */ /* (c) 2007 by the Board of Trustees of the Leland Stanford, Jr., University */ /* All Rights Reserved */ /* Produced by Andrew Hanushevsky for Stanford University under contract */ /* DE-AC02-76-SFO0515 with the Department of Energy */ /* */ /* This file is part of the XRootD software suite. */ /* */ /* XRootD is free software: you can redistribute it and/or modify it under */ /* the terms of the GNU Lesser General Public License as published by the */ /* Free Software Foundation, either version 3 of the License, or (at your */ /* option) any later version. */ /* */ /* XRootD is distributed in the hope that it will be useful, but WITHOUT */ /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */ /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */ /* License for more details. */ /* */ /* You should have received a copy of the GNU Lesser General Public License */ /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */ /* COPYING (GPL license). If not, see . */ /* */ /* The copyright holder's institutional names and contributor's names may not */ /* be used to endorse or promote products derived from this software without */ /* specific prior written permission of the institution or contributor. */ /******************************************************************************/ #include #include #include #include "XrdCms/XrdCmsClientMsg.hh" #include "XrdCms/XrdCmsParser.hh" #include "XrdCms/XrdCmsResp.hh" #include "XrdCms/XrdCmsTrace.hh" #include "XrdOuc/XrdOucBuffer.hh" #include "XrdOuc/XrdOucErrInfo.hh" #include "XrdSfs/XrdSfsInterface.hh" #include "XrdSys/XrdSysError.hh" using namespace XrdCms; /******************************************************************************/ /* G l o b a l s */ /******************************************************************************/ XrdSysSemaphore XrdCmsResp::isReady(0); XrdSysMutex XrdCmsResp::rdyMutex; XrdCmsResp *XrdCmsResp::First = 0; XrdCmsResp *XrdCmsResp::Last = 0; XrdSysMutex XrdCmsResp::myMutex; XrdCmsResp *XrdCmsResp::nextFree = 0; int XrdCmsResp::numFree = 0; int XrdCmsResp::RepDelay = 5; /******************************************************************************/ /* A l l o c */ /******************************************************************************/ XrdCmsResp *XrdCmsResp::Alloc(XrdOucErrInfo *erp, int msgid) { XrdCmsResp *rp; // Allocate a response object. We must be assured that the semaphore count // is zero. This will be true for freshly allocated objects. For reused // objects we will need to run down the count to zero as multiple calls // to sem_init may produced undefined behaviour. // myMutex.Lock(); if (nextFree) {rp = nextFree; nextFree = rp->next; numFree--; rp->SyncCB.Init(); } else if (!(rp = new XrdCmsResp())) {myMutex.UnLock(); return (XrdCmsResp *)0; } myMutex.UnLock(); // Initialize it. We also replace the callback object pointer with a pointer // to the synchronization semaphore as we have taken over the object and must // provide a callback synchronization path for the caller. The OucEI object // must be setup with pointers to stable areas and we will relocate any data // to allow for a message to be sent back without overwriting the data (usually // the path related to this request). We do this manually as the assignment // operator does a bit more and a bit less than what we really need to do here. // strlcpy(rp->UserID, erp->getErrUser(), sizeof(rp->UserID)); rp->setErrUser(rp->UserID); rp->setErrData(erp->getErrData(), XrdOucEI::Path_Offset); rp->setErrInfo(0, ""); rp->setErrMid(erp->getErrMid()); rp->ErrCB = erp->getErrCB(rp->ErrCBarg); erp->setErrCB((XrdOucEICB *)&rp->SyncCB); rp->myID = msgid; rp->next = 0; // Return the response object // return rp; } /******************************************************************************/ /* R e c y c l e */ /******************************************************************************/ void XrdCmsResp::Recycle() { // Recycle appendages // if (myBuff) {myBuff->Recycle(); myBuff = 0;} // We keep a stash of allocated response objects. If there are too many we // simply delete this object. // if (XrdCmsResp::numFree >= XrdCmsResp::maxFree) delete this; else {myMutex.Lock(); next = nextFree; nextFree = this; numFree++; myMutex.UnLock(); } } /******************************************************************************/ /* R e p l y */ /******************************************************************************/ // This version of reply simply queues the object for reply void XrdCmsResp::Reply(const char *manp, CmsRRHdr &rrhdr, XrdOucBuffer *netbuff) { // Copy the data we need to have // myRRHdr = rrhdr; myBuff = netbuff; next = 0; strlcpy(theMan, manp, sizeof(theMan)); // Now queue this object // rdyMutex.Lock(); if (Last) {Last->next = this; Last = this;} else Last=First = this; rdyMutex.UnLock(); // Now indicate we have something to process // isReady.Post(); } /******************************************************************************/ // This version of Reply() dequeues queued replies for processing void XrdCmsResp::Reply() { XrdCmsResp *rp; // Endless look looking for something to reply to // while(1) {isReady.Wait(); rdyMutex.Lock(); if ((rp = First)) {if (!(First = rp->next)) Last = 0; rdyMutex.UnLock(); rp->ReplyXeq(); } else rdyMutex.UnLock(); } } /******************************************************************************/ /* R e p l y X e q */ /******************************************************************************/ void XrdCmsResp::ReplyXeq() { EPNAME("Reply") XrdOucEICB *theCB; int Result; // If there is no callback object, ignore this call. Eventually, we may wish // to simulate a callback but this is rather complicated. // if (!ErrCB) {DEBUG("No callback object for user " <Done(Result, (XrdOucErrInfo *)this, getErrData()); } /******************************************************************************/ /* X r d O d c R e s p Q */ /******************************************************************************/ /******************************************************************************/ /* C o n s t r u c t o r */ /******************************************************************************/ XrdCmsRespQ::XrdCmsRespQ() { memset(mqTab, 0, sizeof(mqTab)); } /******************************************************************************/ /* A d d */ /******************************************************************************/ void XrdCmsRespQ::Add(XrdCmsResp *rp) { int i; // Compute index and either add or chain the entry // i = rp->myID % mqSize; myMutex.Lock(); rp->next = (mqTab[i] ? mqTab[i] : 0); mqTab[i] = rp; myMutex.UnLock(); } /******************************************************************************/ /* P u r g e */ /******************************************************************************/ void XrdCmsRespQ::Purge() { XrdCmsResp *rp; int i; myMutex.Lock(); for (i = 0; i < mqSize; i++) {while ((rp = mqTab[i])) {mqTab[i] = rp->next; delete rp;}} myMutex.UnLock(); } /******************************************************************************/ /* R e m */ /******************************************************************************/ XrdCmsResp *XrdCmsRespQ::Rem(int msgid) { int i; XrdCmsResp *rp, *pp = 0; // Compute the index and find the entry // i = msgid % mqSize; myMutex.Lock(); rp = mqTab[i]; while(rp && rp->myID != msgid) {pp = rp; rp = rp->next;} // Remove the entry if we found it // if (rp) {if (pp) pp->next = rp->next; else mqTab[i] = rp->next; } // Return what we found // myMutex.UnLock(); return rp; }