/******************************************************************************/
/* */
/* X r d S s i P a c e r . c c */
/* */
/* (c) 2016 by the Board of Trustees of the Leland Stanford, Jr., University */
/* Produced by Andrew Hanushevsky for Stanford University under contract */
/* DE-AC02-76-SFO0515 with the Department of Energy */
/* */
/* This file is part of the XRootD software suite. */
/* */
/* XRootD is free software: you can redistribute it and/or modify it under */
/* the terms of the GNU Lesser General Public License as published by the */
/* Free Software Foundation, either version 3 of the License, or (at your */
/* option) any later version. */
/* */
/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
/* License for more details. */
/* */
/* You should have received a copy of the GNU Lesser General Public License */
/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
/* COPYING (GPL license). If not, see . */
/* */
/* The copyright holder's institutional names and contributor's names may not */
/* be used to endorse or promote products derived from this software without */
/* specific prior written permission of the institution or contributor. */
/******************************************************************************/
#include
#include "Xrd/XrdScheduler.hh"
#include "XrdOuc/XrdOucHash.hh"
#include "XrdSsi/XrdSsiPacer.hh"
/******************************************************************************/
/* G l o b a l O b j e c t s */
/******************************************************************************/
namespace XrdSsi
{
extern XrdScheduler *schedP;
}
/******************************************************************************/
/* L o c a l O b j e c t s */
/******************************************************************************/
namespace
{
XrdOucHash reqMap;
}
XrdSsiMutex XrdSsiPacer::pMutex(XrdSsiMutex::Recursive);
XrdSsiPacer XrdSsiPacer::glbQ;
/******************************************************************************/
/* H o l d */
/******************************************************************************/
void XrdSsiPacer::Hold(const char *reqID)
{
XrdSsiMutexMon myLock(pMutex);
// Establish correct anchor
//
if (!reqID) theQ = &glbQ;
else if (!(theQ = reqMap.Find(reqID)))
{theQ = new XrdSsiPacer;
reqMap.Add(reqID, theQ);
}
// Before queing, check we can actually run this right away
//
if (theQ->aCnt)
{XrdSsi::schedP->Schedule(this);
theQ->aCnt--;
if (reqID && theQ->Singleton() && theQ->aCnt == 0) reqMap.Del(reqID);
} else theQ->Q_PushBack(this);
}
/******************************************************************************/
/* R e s e t */
/******************************************************************************/
void XrdSsiPacer::Reset()
{
XrdSsiMutexMon myLock(pMutex);
// If we are in a queue then remove ourselves
//
if (!Singleton())
{Q_Remove();
if (theQ && theQ != &glbQ)
{const char *reqID = RequestID();
if (reqID && theQ->Singleton() && theQ->aCnt == 0) reqMap.Del(reqID);
}
}
}
/******************************************************************************/
/* R u n */
/******************************************************************************/
void XrdSsiPacer::Run(XrdSsiRequest::RDR_Info &rInfo,
XrdSsiRequest::RDR_How rHow, const char *reqID)
{
XrdSsiMutexMon myLock(pMutex);
XrdSsiPacer *anchor, *rItem;
int allowed;
// Determine which anchor to use
//
if (!reqID) anchor = &glbQ;
else if ((anchor = reqMap.Find(reqID))) {}
else if (rHow == XrdSsiRequest::RDR_One || rHow == XrdSsiRequest::RDR_Post)
{anchor = new XrdSsiPacer;
reqMap.Add(reqID, anchor);
}
else return;
// Preset the information we will return
//
rInfo.iAllow = allowed = anchor->aCnt;
// Process as request
//
switch(rHow)
{case XrdSsiRequest::RDR_All:
allowed = anchor->qCnt;
break;
case XrdSsiRequest::RDR_Hold:
rInfo.qCount = anchor->qCnt;
rInfo.fAllow = 0;
anchor->aCnt = 0;
return;
break;
case XrdSsiRequest::RDR_Immed:
allowed = 1;
break;
case XrdSsiRequest::RDR_Query:
rInfo.fAllow = rInfo.iAllow;
rInfo.qCount = anchor->qCnt;
return;
break;
case XrdSsiRequest::RDR_One:
allowed = 1;
break;
case XrdSsiRequest::RDR_Post:
allowed++;
break;
default: return; break;
}
// Run responses
//
while(allowed && anchor->qCnt)
{rItem = anchor->next;
rItem->Q_Remove();
XrdSsi::schedP->Schedule(rItem);
rInfo.rCount++;
allowed--;
}
// Set returned information
//
rInfo.qCount = anchor->qCnt;
if (rHow != XrdSsiRequest::RDR_Immed) anchor->aCnt = allowed;
rInfo.fAllow = anchor->aCnt;
// If this is a local queue, check if we removed the last element
//
if (reqID && anchor->Singleton() && anchor->aCnt == 0) reqMap.Del(reqID);
}