/******************************************************************************/ /* */ /* X r d C l i e n t I n p u t B u f f e r . c c */ /* */ /* Author: Fabrizio Furano (INFN Padova, 2004) */ /* Adapted from TXNetFile (root.cern.ch) originally done by */ /* Alvise Dorigo, Fabrizio Furano */ /* INFN Padova, 2003 */ /* */ /* This file is part of the XRootD software suite. */ /* */ /* XRootD is free software: you can redistribute it and/or modify it under */ /* the terms of the GNU Lesser General Public License as published by the */ /* Free Software Foundation, either version 3 of the License, or (at your */ /* option) any later version. */ /* */ /* XRootD is distributed in the hope that it will be useful, but WITHOUT */ /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */ /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */ /* License for more details. */ /* */ /* You should have received a copy of the GNU Lesser General Public License */ /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */ /* COPYING (GPL license). If not, see . */ /* */ /* The copyright holder's institutional names and contributor's names may not */ /* be used to endorse or promote products derived from this software without */ /* specific prior written permission of the institution or contributor. */ /******************************************************************************/ ////////////////////////////////////////////////////////////////////////// // // // Buffer for incoming messages (responses) // // Handles the waiting (with timeout) for a message to come // // belonging to a logical streamid // // Multithread friendly // // // ////////////////////////////////////////////////////////////////////////// #include "XrdClient/XrdClientInputBuffer.hh" #include "XrdSys/XrdSysPthread.hh" #include "XrdClient/XrdClientDebug.hh" #ifndef WIN32 #include #endif #include using namespace std; //________________________________________________________________________ int XrdClientInputBuffer::MsgForStreamidCnt(int streamid) { // Counts the number of messages belonging to the given streamid int cnt = 0; XrdClientMessage *m = 0; for (fMsgIter = 0; fMsgIter < fMsgQue.GetSize(); ++fMsgIter) { m = fMsgQue[fMsgIter]; if (m->MatchStreamid(streamid)) cnt++; } return cnt; } //________________________________________________________________________ int XrdClientInputBuffer::WipeStreamid(int streamid) { // Remove all the pending messages for the given streamid // Healthy after connection shutdowns int cnt = 0; XrdClientMessage *m = 0; { XrdSysMutexHelper mtx(fMutex); for (fMsgIter = fMsgQue.GetSize()-1; fMsgIter >= 0; --fMsgIter) { m = fMsgQue[fMsgIter]; if (m->MatchStreamid(streamid)) { delete m; fMsgQue.Erase(fMsgIter); cnt++; } } } return cnt; } //________________________________________________________________________ XrdSysSemWait *XrdClientInputBuffer::GetSyncObjOrMakeOne(int streamid) { // Gets the right sync obj to wait for messages for a given streamid // If the semaphore is not available, it creates one. XrdSysSemWait *sem; { XrdSysMutexHelper mtx(fMutex); char buf[20]; snprintf(buf, 20, "%d", streamid); sem = fSyncobjRepo.Find(buf); if (!sem) { sem = new XrdSysSemWait(0); fSyncobjRepo.Rep(buf, sem); return sem; } else return sem; } } //_______________________________________________________________________ XrdClientInputBuffer::XrdClientInputBuffer() { // Constructor fMsgQue.Clear(); } //_______________________________________________________________________ int DeleteHashItem(const char *key, XrdSysSemWait *sem, void *Arg) { // This makes the Apply method delete the entry return -1; } XrdClientInputBuffer::~XrdClientInputBuffer() { // Destructor // Delete all the syncobjs { XrdSysMutexHelper mtx(fMutex); // Delete the content of the queue for (fMsgIter = 0; fMsgIter < fMsgQue.GetSize(); ++fMsgIter) { if (fMsgQue[fMsgIter]) delete fMsgQue[fMsgIter]; fMsgQue[fMsgIter] = 0; } fMsgQue.Clear(); // Delete all the syncobjs fSyncobjRepo.Apply(DeleteHashItem, 0); } } //_______________________________________________________________________ int XrdClientInputBuffer::PutMsg(XrdClientMessage* m) { // Put message in the list int sz; XrdSysSemWait *sem = 0; { XrdSysMutexHelper mtx(fMutex); fMsgQue.Push_back(m); sz = MexSize(); // Is anybody sleeping ? if (m) sem = GetSyncObjOrMakeOne( m->HeaderSID() ); } if (sem) { sem->Post(); } return sz; } //_______________________________________________________________________ XrdClientMessage *XrdClientInputBuffer::GetMsg(int streamid, int secstimeout) { // Gets the first XrdClientMessage from the queue, given a matching streamid. // If there are no XrdClientMessages for the streamid, it waits for a number // of seconds for something to come XrdSysSemWait *sem = 0; XrdClientMessage *res = 0, *m = 0; // Find the sem where to wait for a msg sem = GetSyncObjOrMakeOne(streamid); int to = secstimeout; int dt = (to > 2) ? 2 : to; // 2 secs steps while (to > 0) { int rc = sem->Wait(dt); if (!rc) { // make sure is not a spurious signal ... XrdSysMutexHelper mtx(fMutex); if (fMsgQue.GetSize() > 0) { // We were awakened. Or the timeout elapsed. The mtx is again locked. // If there are messages to dequeue, we pick the oldest one for (fMsgIter = 0; fMsgIter < fMsgQue.GetSize(); ++fMsgIter) { m = fMsgQue[fMsgIter]; if ((!m) || m->IsError() || m->MatchStreamid(streamid)) { res = fMsgQue[fMsgIter]; fMsgQue.Erase(fMsgIter); if (!m) return 0; break; } } break; } } else to -= dt; } return res; }