/******************************************************************************/
/* */
/* X r d F r m R e q B o s s . c c */
/* */
/* (c) 2010 by the Board of Trustees of the Leland Stanford, Jr., University */
/* All Rights Reserved */
/* Produced by Andrew Hanushevsky for Stanford University under contract */
/* DE-AC02-76-SFO0515 with the Department of Energy */
/* */
/* This file is part of the XRootD software suite. */
/* */
/* XRootD is free software: you can redistribute it and/or modify it under */
/* the terms of the GNU Lesser General Public License as published by the */
/* Free Software Foundation, either version 3 of the License, or (at your */
/* option) any later version. */
/* */
/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
/* License for more details. */
/* */
/* You should have received a copy of the GNU Lesser General Public License */
/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
/* COPYING (GPL license). If not, see . */
/* */
/* The copyright holder's institutional names and contributor's names may not */
/* be used to endorse or promote products derived from this software without */
/* specific prior written permission of the institution or contributor. */
/******************************************************************************/
#include
#include
#include
#include
#include
#include
#include
#include
#include "XrdFrc/XrdFrcCID.hh"
#include "XrdFrc/XrdFrcTrace.hh"
#include "XrdFrc/XrdFrcUtils.hh"
#include "XrdFrm/XrdFrmReqBoss.hh"
#include "XrdFrm/XrdFrmXfrQueue.hh"
#include "XrdNet/XrdNetMsg.hh"
#include "XrdOuc/XrdOucUtils.hh"
#include "XrdSys/XrdSysHeaders.hh"
using namespace XrdFrc;
/******************************************************************************/
/* T h r e a d I n t e r f a c e s */
/******************************************************************************/
void *mainServerXeq(void *parg)
{
XrdFrmReqBoss *theBoss = (XrdFrmReqBoss *)parg;
theBoss->Process();
return (void *)0;
}
/******************************************************************************/
/* Public: A d d */
/******************************************************************************/
void XrdFrmReqBoss::Add(XrdFrcRequest &Request)
{
// Complete the request including verifying the priority
//
if (Request.Prty > XrdFrcRequest::maxPrty)
Request.Prty = XrdFrcRequest::maxPrty;
else if (Request.Prty < 0)Request.Prty = 0;
Request.addTOD = time(0);
// Now add it to the queue
//
rQueue[static_cast(Request.Prty)]->Add(&Request);
// Now wake ourselves up
//
Wakeup(1);
}
/******************************************************************************/
/* Public: D e l */
/******************************************************************************/
void XrdFrmReqBoss::Del(XrdFrcRequest &Request)
{
int i;
// Remove all pending requests for this id
//
for (i = 0; i <= XrdFrcRequest::maxPrty; i++) rQueue[i]->Can(&Request);
}
/******************************************************************************/
/* Public: P r o c e s s */
/******************************************************************************/
void XrdFrmReqBoss::Process()
{
EPNAME("Process");
XrdFrcRequest myReq;
int i, rc, numXfr, numPull;;
// Perform staging in an endless loop
//
do{Wakeup(0);
do{numXfr = 0;
for (i = XrdFrcRequest::maxPrty; i >= 0; i--)
{numPull = i+1;
while(numPull && (rc = rQueue[i]->Get(&myReq)))
{if (myReq.Options & XrdFrcRequest::Register) Register(myReq,i);
else {numPull -= XrdFrmXfrQueue::Add(&myReq,rQueue[i],theQ);
numXfr++;
DEBUG(Persona <<" from Q " << i <<' ' <(Req.addTOD), Pid))
{DEBUG("Instance=" <Init()) {free(qPath); return 0;}
}
free(qPath);
// Start the request processing thread
//
if ((retc = XrdSysThread::Run(&tid, mainServerXeq, (void *)this,
XRDSYSTHREAD_BIND, Persona)))
{sprintf(buff, "create %s request thread", Persona);
Say.Emsg("Start", retc, buff);
return 0;
}
// All done
//
return 1;
}
/******************************************************************************/
/* Public: W a k e u p */
/******************************************************************************/
void XrdFrmReqBoss::Wakeup(int PushIt)
{
static XrdSysMutex rqMutex;
// If this is a PushIt then see if we need to push the binary semaphore
//
if (PushIt) {rqMutex.Lock();
if (!isPosted) {rqReady.Post(); isPosted = 1;}
rqMutex.UnLock();
}
else {rqReady.Wait();
rqMutex.Lock(); isPosted = 0; rqMutex.UnLock();
}
}