/******************************************************************************/
/* */
/* X r d C m s B a s e F S . c c */
/* */
/* (c) 2011 by the Board of Trustees of the Leland Stanford, Jr., University */
/* All Rights Reserved */
/* Produced by Andrew Hanushevsky for Stanford University under contract */
/* DE-AC02-76-SFO0515 with the Department of Energy */
/* */
/* This file is part of the XRootD software suite. */
/* */
/* XRootD is free software: you can redistribute it and/or modify it under */
/* the terms of the GNU Lesser General Public License as published by the */
/* Free Software Foundation, either version 3 of the License, or (at your */
/* option) any later version. */
/* */
/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
/* License for more details. */
/* */
/* You should have received a copy of the GNU Lesser General Public License */
/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
/* COPYING (GPL license). If not, see . */
/* */
/* The copyright holder's institutional names and contributor's names may not */
/* be used to endorse or promote products derived from this software without */
/* specific prior written permission of the institution or contributor. */
/******************************************************************************/
#include
#include
#include
#include
#include
#include "XProtocol/YProtocol.hh"
#include "XProtocol/XPtypes.hh"
#include "XrdCms/XrdCmsBaseFS.hh"
#include "XrdCms/XrdCmsConfig.hh"
#include "XrdCms/XrdCmsPrepare.hh"
#include "XrdCms/XrdCmsTrace.hh"
#include "XrdOss/XrdOss.hh"
#include "XrdSfs/XrdSfsFlags.hh"
#include "XrdSys/XrdSysError.hh"
#include "XrdSys/XrdSysTimer.hh"
using namespace XrdCms;
/******************************************************************************/
/* E x t e r n a l T h r e a d I n t e r f a c e s */
/******************************************************************************/
void *XrdCmsBasePacer(void *carg)
{((XrdCmsBaseFS *)carg)->Pacer();
return (void *)0;
}
void *XrdCmsBaseRunner(void *carg)
{((XrdCmsBaseFS *)carg)->Runner();
return (void *)0;
}
/******************************************************************************/
/* Private: B y p a s s */
/******************************************************************************/
int XrdCmsBaseFS::Bypass()
{
static XrdSysTimer Window;
// If we are not timing requests, we can bypass (typically checked beforehand)
//
if (!theQ.rLimit) return 1;
// If this is a fixed rate queue then we cannot bypass
//
if (Fixed) return 0;
// Check if we can reset the number of requests that can be issued inline. We
// do this to bypass the queue unless until we get flooded by requests.
//
theQ.Mutex.Lock();
if (!theQ.rLeft && !theQ.pqFirst)
{unsigned long Interval = 0;
Window.Report(Interval);
if (Interval >= 450)
{theQ.rLeft = theQ.rAgain;
Window.Reset();
cerr <<"BYPASS " <= 0 && Path[fnPos] != '/'; fnPos--) {}
if (fnPos > 0 && !hasDir(Path, fnPos)) return -1;
}
// Issue stat() via oss plugin. If it succeeds, return result.
//
if (!Config.ossFS->Stat(Path, &buf, Opts))
{if ((buf.st_mode & S_IFMT) == S_IFREG)
return (buf.st_mode & XRDSFS_POSCPEND ? CmsHaveRequest::Pending
: CmsHaveRequest::Online);
return (buf.st_mode & S_IFMT) == S_IFDIR ? CmsHaveRequest::Online : -1;
}
// The entry does not exist but if we are a staging server then it may be in
// the prepare queue in which case we must say that it is pending arrival.
//
if (Config.DiskSS && PrepQ.Exists(Path)) return CmsHaveRequest::Pending;
// The entry does not exist. Check if the directory exists and if not, put it
// in our directory missing table so we don't keep hitting this directory.
// This is disabled by default and enabled by the cms.dfs directive.
//
if (fnPos > 0 && dmLife)
{struct dMoP *xVal = &dirMiss;
int xLife = dmLife;
Path[fnPos] = '\0';
if (!Config.ossFS->Stat(Path, &buf, XRDOSS_resonly))
{xLife = dpLife; xVal = &dirPres;}
fsMutex.Lock();
fsDirMP.Rep(Path, xVal, xLife, Hash_keepdata);
fsMutex.UnLock();
DEBUG("add " <Present ? " okdir " : " nodir ") <Present : 1);
fsMutex.UnLock();
Path[fnPos] = '/';
return Have;
}
/******************************************************************************/
/* I n i t */
/******************************************************************************/
void XrdCmsBaseFS::Init(int Opts, int DMLife, int DPLife)
{
// Set values.
//
dmLife = DMLife;
dpLife = DPLife ? DPLife : DMLife * 10;
Server = (Opts & Servr) != 0;
lclStat = (Opts & Cntrl) != 0 || Server;
preSel = (Opts & Immed) == 0;
dfsSys = (Opts & DFSys) != 0;
}
/******************************************************************************/
/* L i m i t */
/******************************************************************************/
void XrdCmsBaseFS::Limit(int rLim, int Qmax)
{
// Establish the limits
//
if (rLim < 0) {theQ.rAgain=theQ.rLeft = -1; rLim = -rLim; Fixed = 1;}
else {theQ.rAgain = theQ.rLeft = (rLim > 1 ? rLim/2 : 1); Fixed = 0;}
theQ.rLimit = (rLim <= 1000 ? rLim : 0);
if (Qmax > 0) theQ.qMax = Qmax;
else if (!(theQ.qMax = theQ.rLimit*2 + theQ.rLimit/2)) theQ.qMax = 1;
}
/******************************************************************************/
/* P a c e r */
/******************************************************************************/
void XrdCmsBaseFS::Pacer()
{
XrdCmsBaseFR *rP;
int inQ, rqRate = 1000/theQ.rLimit;
// Process requests at the given rate
//
do{theQ.pqAvail.Wait();
theQ.Mutex.Lock(); inQ = 1;
while((rP = theQ.pqFirst))
{if (!(theQ.pqFirst = rP->Next)) {theQ.pqLast = 0; inQ = 0;}
theQ.Mutex.UnLock();
if (rP->PDirLen > 0 && !hasDir(rP->Path, rP->PDirLen))
{delete rP; continue;}
theQ.Mutex.Lock();
if (theQ.rqFirst) {theQ.rqLast->Next = rP; theQ.rqLast = rP;}
else {theQ.rqFirst = theQ.rqLast = rP; theQ.rqAvail.Post();}
theQ.Mutex.UnLock();
XrdSysTimer::Wait(rqRate);
if (!inQ) break;
theQ.Mutex.Lock();
}
if (inQ) theQ.Mutex.UnLock();
} while(1);
}
/******************************************************************************/
/* Q u e u e */
/******************************************************************************/
void XrdCmsBaseFS::Queue(XrdCmsRRData &Arg, XrdCmsPInfo &Who,
int fnpos, int Force)
{
EPNAME("Queue");
static int noMsg = 1;
XrdCmsBaseFR *rP;
int Msg, n, prevHWM;
// If we can bypass the queue and execute this now. Avoid the grabbing the buff.
//
if (!Force)
{XrdCmsBaseFR myReq(&Arg, Who, fnpos);
Xeq(&myReq);
return;
}
// Queue this request for callback after an appropriate time.
// We will also steal the underlying data buffer from the Arg.
//
DEBUG("inq " < prevHWM))) theQ.qHWM = n;
if (theQ.pqFirst) {theQ.pqLast->Next = rP; theQ.pqLast = rP;}
else {theQ.pqFirst = theQ.pqLast = rP; theQ.pqAvail.Post();}
theQ.Mutex.UnLock();
// Issue a warning message if we have an excessive number of requests queued
//
if (n > theQ.qMax && Msg && (n-prevHWM > 3 || noMsg))
{int Pct = n/theQ.qMax;
char Buff[80];
noMsg = 0;
sprintf(Buff, "Queue overrun %d%%; %d requests now queued.", Pct, n);
Say.Emsg("Pacer", Buff);
}
}
/******************************************************************************/
/* R u n n e r */
/******************************************************************************/
void XrdCmsBaseFS::Runner()
{
XrdCmsBaseFR *rP;
int inQ;
// Process requests at the given rate
//
do{theQ.rqAvail.Wait();
theQ.Mutex.Lock(); inQ = 1;
while((rP = theQ.rqFirst))
{if (!(theQ.rqFirst = rP->Next)) {theQ.rqLast = 0; inQ = 0;}
theQ.qNum--;
theQ.Mutex.UnLock();
Xeq(rP); delete rP;
if (!inQ) break;
theQ.Mutex.Lock();
}
if (inQ) theQ.Mutex.UnLock();
} while(1);
}
/******************************************************************************/
/* S t a r t */
/******************************************************************************/
void XrdCmsBaseFS::Start()
{
EPNAME("Start");
void *Me = (void *)this;
pthread_t tid;
// Issue some debugging here so we know how we are starting up
//
DEBUG("Srv=" <PDirLen > 0 && !hasDir(rP->Path, rP->PDirLen))
{if (cBack) (*cBack)(rP, -1);
return;
}
// If we have exceeded the queue limit and this is a meta-manager request
// then just deep-six it. Local requests must complete
//
if (theQ.qNum > theQ.qMax)
{Say.Emsg("Xeq", "Queue limit exceeded; ignoring lkup for", rP->Path);
return;
}
// Perform a local stat() and if we don't have the file
//
rc = Exists(rP->Path, rP->PDirLen);
if (cBack) (*cBack)(rP, rc);
}