/***********************************************************************************************/
/* */
/* X r d C m s N o d e . c c */
/* */
/* (c) 2007 by the Board of Trustees of the Leland Stanford, Jr., University */
/* All Rights Reserved */
/* Produced by Andrew Hanushevsky for Stanford University under contract */
/* DE-AC02-76-SFO0515 with the Department of Energy */
/* */
/* This file is part of the XRootD software suite. */
/* */
/* XRootD is free software: you can redistribute it and/or modify it under */
/* the terms of the GNU Lesser General Public License as published by the */
/* Free Software Foundation, either version 3 of the License, or (at your */
/* option) any later version. */
/* */
/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
/* License for more details. */
/* */
/* You should have received a copy of the GNU Lesser General Public License */
/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
/* COPYING (GPL license). If not, see . */
/* */
/* The copyright holder's institutional names and contributor's names may not */
/* be used to endorse or promote products derived from this software without */
/* specific prior written permission of the institution or contributor. */
/******************************************************************************/
#include
#include
#include
#include
#include
#include
#include
#include "Xrd/XrdJob.hh"
#include "Xrd/XrdLink.hh"
#include "XProtocol/YProtocol.hh"
#include "XrdCms/XrdCmsBaseFS.hh"
#include "XrdCms/XrdCmsCache.hh"
#include "XrdCms/XrdCmsCluster.hh"
#include "XrdCms/XrdCmsClustID.hh"
#include "XrdCms/XrdCmsConfig.hh"
#include "XrdCms/XrdCmsManager.hh"
#include "XrdCms/XrdCmsManList.hh"
#include "XrdCms/XrdCmsMeter.hh"
#include "XrdCms/XrdCmsPList.hh"
#include "XrdCms/XrdCmsPrepare.hh"
#include "XrdCms/XrdCmsRRData.hh"
#include "XrdCms/XrdCmsNode.hh"
#include "XrdCms/XrdCmsSelect.hh"
#include "XrdCms/XrdCmsState.hh"
#include "XrdCms/XrdCmsTrace.hh"
#include "XrdOss/XrdOss.hh"
#include "XrdOuc/XrdOucName2Name.hh"
#include "XrdOuc/XrdOucProg.hh"
#include "XrdOuc/XrdOucPup.hh"
#include "XrdOuc/XrdOucUtils.hh"
#include "XrdNet/XrdNetUtils.hh"
#include "XrdSys/XrdSysPlatform.hh"
using namespace XrdCms;
/******************************************************************************/
/* S t a t i c O b j e c t s */
/******************************************************************************/
XrdSysMutex XrdCmsNode::mlMutex;
int XrdCmsNode::LastFree = 0;
namespace
{
XrdNetIF::ifType ifVec[4] = {XrdNetIF::PublicV4, XrdNetIF::Public46,
XrdNetIF::PublicV6, XrdNetIF::Public64};
};
/******************************************************************************/
/* C o n s t r u c t o r */
/******************************************************************************/
XrdCmsNode::XrdCmsNode(XrdLink *lnkp, const char *theIF, const char *nid,
int port, int lvl, int id) : nodeMutex(0, "nodeCV")
{
static XrdSysMutex iMutex;
static const SMask_t smask_1(1);
static int iNum = 1;
Link = lnkp;
NodeMask = (id < 0 ? 0 : smask_1 << id);
NodeID = id;
cidP = 0;
hasNet = 0;
isBad = 0;
isOffline= (lnkp == 0);
isNoStage= 0;
isBound = 0;
isConn = 0;
isGone = 0;
isPerm = 0;
isMan = 0;
isKnown = 0;
isPeer = 0;
incUL = 0;
myCost = 0;
myLoad = 0;
myMass = 0;
DiskTotal= 0;
DiskFree = 0;
DiskMinF = 0;
DiskNums = 0;
DiskUtil = 0;
Next = 0;
RefW = 0;
RefTotW = 0;
RefR = 0;
RefTotR = 0;
Share = 0;
Shrem = 0;
Shrin = 0;
logload = Config.LogPerf;
DropTime = 0;
DropJob = 0;
myName = 0;
myNlen = 0;
Ident = 0;
myNID = strdup(nid ? nid : "?");
if ((myCID = index(myNID, ' '))) myCID++;
else myCID = myNID;
myLevel = lvl;
ConfigID = 0;
TZValid = 0;
TimeZone = 0;
subsPort = 0;
myVersion= kYR_Version;
lkCount = 0;
ulCount = 0;
Manager = 0;
// setName() will set the node identification information
//
setName(lnkp, theIF, (nid ? port : 0));
iMutex.Lock();
Instance = iNum++;
iMutex.UnLock();
}
/******************************************************************************/
/* D e s t r u c t o r */
/******************************************************************************/
XrdCmsNode::~XrdCmsNode()
{
isOffline = 1; // STMutex not needed here
if (isLocked) UnLock();
// Delete other appendages
//
if (cidP) {cidP->RemNode(this); cidP = 0;}
if (Ident) free(Ident);
if (myNID) free(myNID);
if (myName)free(myName);
}
/******************************************************************************/
/* s e t N a m e */
/******************************************************************************/
void XrdCmsNode::setName(XrdLink *lnkp, const char *theIF, int port)
{
char buff[512];
const char *hname = lnkp->Host();
// Check if this is a duplicate. Note that we check for strict equivalence.
//
if (myName)
{if (!strcmp(myName,hname) && port == netIF.Port()
&& netID.Same(lnkp->NetAddr())) return;
free(myName);
}
// Get our address information but substitute data port for actual port
//
netID = *(lnkp->NetAddr());
// Set the network interface. Note that out of domain nodes are not allowed
// to specify interface addresses as this does not make global sense.
//
if (theIF && !netIF.InDomain(&netID)) theIF = 0;
netIF.SetIF(&netID, theIF, port);
hasNet = netIF.Mask();
// Construct our identification
//
myName = strdup(hname);
myNlen = strlen(hname);
if (!port) strcpy(buff, lnkp->ID);
else sprintf(buff, "%s:%d", lnkp->ID, port);
if (Ident) free(Ident);
Ident = strdup(buff);
}
/******************************************************************************/
/* D e l e t e */
/******************************************************************************/
void XrdCmsNode::Delete(XrdSysMutex &gMutex)
{
EPNAME("Delete");
time_t tNow;
unsigned int theLKCnt;
int tmoWait = 60, totWait = 0;
bool doDel = true;
// We need to make sure there are no references to this object. This is true
// when the lkCount equals the ulCount. The lkCount is under control of the
// global mutex passed to us. The ulCount is under control of the node lock.
// we will wait until they are equal. As this node has been removed from all
// tables at this point, the lkCount cannot increase but it may decrease when
// Ref2G() is called which happens for none lock-free operations (e.g. Send).
// However, we will refresh it if we timeout.
//
gMutex.Lock();
theLKCnt = lkCount;
gMutex.UnLock();
// Get the node lock and do some debugging. Set tghe isGone flag even though it
// should be set. We need to do that under the node lock to make sure we get
// signalled whenever the node gets unlocked by some thread (ulCount changed).
//
nodeMutex.Lock();
isGone = 1;
DEBUG(Ident <<" locks=" <setEtext(reason);
Link->Close(1);
isConn = 0;
}
// Unlock ourselves if we locked ourselves
//
if (needLock) nodeMutex.UnLock();
}
/******************************************************************************/
/* d o _ A v a i l */
/******************************************************************************/
// Node responses to space usage requests from a manager are localized to the
// cell and need not be propopagated in any direction.
//
const char *XrdCmsNode::do_Avail(XrdCmsRRData &Arg)
{
EPNAME("do_Avail")
// Process: avail
//
DiskFree = Arg.dskFree;
DiskUtil = static_cast(Arg.dskUtil);
// Do some debugging
//
DEBUGR(DiskFree <<"MB free; " <Chmod(Arg.Path, mode);
// Return appropriate result
//
return (rc ? fsFail(Arg.Ident, "chmod", Arg.Path, rc) : 0);
}
/******************************************************************************/
/* d o _ D i s c */
/******************************************************************************/
// When a manager receives a disc response from a node it sends a disc request
// and then closes the connection.
// When a node receives a disc request it simply closes the connection.
const char *XrdCmsNode::do_Disc(XrdCmsRRData &Arg)
{
// Indicate we have received a disconnect
//
Say.Emsg("Node", Link->Name(), "requested a disconnect");
// If we must send a disc request, do so now
//
if (Config.asManager()) Link->Send((char *)&Arg.Request,sizeof(Arg.Request));
// Close the link and return an error
//
isOffline = 1; // STMutex not needed here
Link->Close(1);
return "."; // Signal disconnect
}
/******************************************************************************/
/* d o _ G o n e */
/******************************************************************************/
// When a manager receives a gone request it is propogated if we are subscribed
// and we have not sent a gone request in the immediate past.
//
const char *XrdCmsNode::do_Gone(XrdCmsRRData &Arg)
{
EPNAME("do_Gone")
static const SMask_t allNodes(~0);
int newgone;
// Do some debugging
//
TRACER(Files,Arg.Path);
// Update path information and delete this from the prep queue if we are a
// staging node. We can also be called via the admin end-point interface
// In this case, we have no cache and simply forward up the request.
//
if (Config.asManager())
{XrdCmsSelect Sel(XrdCmsSelect::Advisory, Arg.Path, Arg.PathLen-1);
newgone = Cache.DelFile(Sel, baseFS.isDFS() ? allNodes : NodeMask);
} else {
newgone = 1;
if (Config.DiskSS) PrepQ.Gone(Arg.Path);
}
// If we have no managers and we still have the file or never had it, return
//
if (!XrdCmsManager::Present() || !newgone) return 0;
// Back-propogate the gone to all of our managers
//
XrdCmsManager::Inform(Arg.Request, Arg.Buff, Arg.Dlen);
// All done
//
return 0;
}
/******************************************************************************/
/* d o _ H a v e */
/******************************************************************************/
// When a manager receives a have request it is propogated if we are subscribed
// and we have not sent a have request in the immediate past.
//
const char *XrdCmsNode::do_Have(XrdCmsRRData &Arg)
{
EPNAME("do_Have")
static const SMask_t allNodes(~0);
XrdCmsPInfo pinfo;
int isnew, Opts;
// Do some debugging
//
TRACER(Files, (Arg.Request.modifier&CmsHaveRequest::Pending ? "P ":"")
<
// 0 1 2 3 4 5 6
pcpu = static_cast(Arg.Opaque[CmsLoadRequest::cpuLoad]);
pnet = static_cast(Arg.Opaque[CmsLoadRequest::netLoad]);
pxeq = static_cast(Arg.Opaque[CmsLoadRequest::xeqLoad]);
pmem = static_cast(Arg.Opaque[CmsLoadRequest::memLoad]);
ppag = static_cast(Arg.Opaque[CmsLoadRequest::pagLoad]);
pdsk = static_cast(Arg.Opaque[CmsLoadRequest::dskLoad]);
// Compute actual load value
//
myLoad = Meter.calcLoad(pcpu, pnet, pxeq, pmem, ppag);
myMass = Meter.calcLoad(myLoad, pdsk);
DiskFree = Arg.dskFree;
DiskUtil = pdsk;
// Do some debugging
//
DEBUGR("cpu=" <> CmsLocateRequest::kYR_retipsft];
// Indicate whether we want a name or an actual address
//
lsopts = (Arg.Opts & CmsLocateRequest::kYR_retname
? XrdCmsCluster::LS_IDNT : XrdCmsCluster::LS_IPO);
// Indicate if only a single server entry should be listed
//
if (Arg.Opts & CmsLocateRequest::kYR_retuniq && baseFS.isDFS())
{lsuniq = true; *toP++='u';}
// Indicate whether we can ignore network restrictions
//
if (Arg.Opts & CmsLocateRequest::kYR_listall)
lsopts |= XrdCmsCluster::LS_ANY;
// Indicate whether we ony want to list a single entry
//
// Handle private networks here
//
if (Arg.Opts & CmsLocateRequest::kYR_prvtnet)
{XrdNetIF::Privatize(ifType);
*toP++='P';
}
// Encode if type into the options
//
Sel.Opts = static_cast(ifType) & XrdCmsSelect::ifWant;
lsopts = static_cast(lsopts | ifType);
// Grab various options
//
if (Arg.Opts & CmsLocateRequest::kYR_refresh)
{Sel.Opts = XrdCmsSelect::Refresh; *toP++='s';}
if (Arg.Opts & CmsLocateRequest::kYR_asap)
{Sel.Opts |= XrdCmsSelect::Asap; *toP++='i'; Sel.InfoP = &reqInfo;
reqInfo.lsLU = static_cast(lsopts);
}
else Sel.InfoP = 0;
// Do some debugging
//
*toP = '\0';
DEBUGR(theopts <<' ' < 0)
{Arg.Request.rrCode = kYR_wait;
bytes = sizeof(Resp.Val); Why = "delay ";
} else {
if (rc == -2) return 0;
Arg.Request.rrCode = kYR_error;
rc = kYR_ENOENT; Why = "miss ";
bytes = strlcpy(Resp.outbuff, "No servers have access to the file",
sizeof(Resp.outbuff)) + sizeof(Resp.Val) + 1;
}
} else {Why = "?"; bytes = 0;}
// List the servers
//
if (!rc)
{if (!Sel.Vec.hf || !(sP=Cluster.List(Sel.Vec.hf, lsopts, oksel)))
{const char *eTxt;
Arg.Request.rrCode = kYR_error;
if (oksel)
{rc = kYR_ENETUNREACH; Why = "unreachable ";
sprintf(eBuff, "No servers are reachable via %s network",
XrdNetIF::Name(ifType));
eTxt = eBuff;
} else {
rc = kYR_ENOENT; Why = "none ";
eTxt = "No servers have the file";
}
bytes = strlcpy(Resp.outbuff, eTxt,
sizeof(Resp.outbuff)) + sizeof(Resp.Val) + 1;
} else rc = 0;
}
// Either prepare to send an error or format the result
//
if (rc)
{Resp.Val = htonl(rc);
DEBUGR(Why <Send(ioV, 2, bytes+sizeof(Arg.Request));
return 0;
}
/******************************************************************************/
/* Static d o _ L o c F m t */
/******************************************************************************/
int XrdCmsNode::do_LocFmt(char *buff, XrdCmsSelected *sP,
SMask_t pfVec, SMask_t wfVec, bool lsall, bool lsuniq)
{
static const int Skip = (XrdCmsSelected::Disable | XrdCmsSelected::Offline);
static const int Hung = (XrdCmsSelected::Disable | XrdCmsSelected::Offline
| XrdCmsSelected::Suspend);
XrdCmsSelected *pP;
char *oP = buff;
// If only unique entries are wanted then we need to only let through
// all non-servers and one server (prefereably a r/w one)
//
if (!lsall && lsuniq)
{XrdCmsSelected *xP = 0;
bool haverw = false;
pP = sP;
while(pP)
{if (!(pP->Status & (XrdCmsSelected::isMangr | Skip)))
{if (haverw) pP->Status |= Skip;
else {if (xP) xP->Status |= Skip;
xP = pP;
haverw = (pP->Mask & wfVec) != 0;
}
}
pP = pP->next;
}
}
// format out the request as follows:
// 01234567810123456789212345678
// xy[::123.123.123.123]:123456
//
if (lsall)
while(sP)
{*oP = (sP->Status & XrdCmsSelected::isMangr ? 'M' : 'S');
if (sP->Status & Hung) *oP = tolower(*oP);
*(oP+1) = (sP->Mask & wfVec ? 'w' : 'r');
strcpy(oP+2, sP->Ident); oP += sP->IdentLen + 2;
if (sP->next) *oP++ = ' ';
pP = sP; sP = sP->next; delete pP;
}
else
while(sP)
{if (!(sP->Status & Skip))
{*oP = (sP->Status & XrdCmsSelected::isMangr ? 'M' : 'S');
if (sP->Mask & pfVec) *oP = tolower(*oP);
*(oP+1) = (sP->Mask & wfVec ? 'w' : 'r');
strcpy(oP+2, sP->Ident); oP += sP->IdentLen + 2;
if (sP->next) *oP++ = ' ';
}
pP = sP; sP = sP->next; delete pP;
}
// Send of the result
//
*oP = '\0';
return (oP - buff);
}
/******************************************************************************/
/* d o _ M k d i r */
/******************************************************************************/
// Mkdir requests are forwarded to all subscribers
//
const char *XrdCmsNode::do_Mkdir(XrdCmsRRData &Arg)
{
EPNAME("do_Mkdir")
mode_t mode = 0;
int rc;
// Do some debugging
//
DEBUGR("mode " <Mkdir(Arg.Path, mode);
// Return appropriate result
//
return (rc ? fsFail(Arg.Ident, "mkdir", Arg.Path, rc) : 0);
}
/******************************************************************************/
/* d o _ M k p a t h */
/******************************************************************************/
// Mkpath requests are forwarded to all subscribers
//
const char *XrdCmsNode::do_Mkpath(XrdCmsRRData &Arg)
{
EPNAME("do_Mkpath")
mode_t mode = 0;
int rc;
// Do some debugging
//
DEBUGR("mode " <Mkdir(Arg.Path, mode, 1);
// Return appropriate result
//
return (rc ? fsFail(Arg.Ident, "mkpath", Arg.Path, rc) : 0);
}
/******************************************************************************/
/* d o _ M v */
/******************************************************************************/
// Mv requests are forwarded to all subscribers
//
const char *XrdCmsNode::do_Mv(XrdCmsRRData &Arg)
{
EPNAME("do_Mv")
static const SMask_t allNodes(~0);
int rc;
// Do some debugging
//
DEBUGR(Arg.Path <<" to " < 0) {Arg.waitVal = rc; return "!mv";}
else if (Sel2.Vec.hf)
{Say.Emsg("do_Mv",Arg.Path2,"exists; mv failed for",Arg.Path);
return "target file exists";
}
}
Cache.DelFile(Sel2, allNodes);
Cache.DelFile(Sel1, allNodes);
return 0;
}
// Rename the file via call-out or oss plug-in (we used to do this via a requeue
// to the local xrootd but it's no longer necessary).
//
if (Config.ProgMV) rc = fsExec(Config.ProgMV, Arg.Path, Arg.Path2);
else rc = Config.ossFS->Rename(Arg.Path, Arg.Path2);
// Return appropriate result
//
return (rc ? fsFail(Arg.Ident, "mv", Arg.Path, rc) : 0);
}
/******************************************************************************/
/* d o _ P i n g */
/******************************************************************************/
// Ping requests from a manager are local to the cell and never propagated.
//
const char *XrdCmsNode::do_Ping(XrdCmsRRData &Arg)
{
static CmsPongRequest pongIt = {{0, kYR_pong, 0, 0}};
// Process: ping
// Respond: pong
//
if (isBad & isDoomed) return ".redirected";
Link->Send((char *)&pongIt, sizeof(pongIt));
return 0;
}
/******************************************************************************/
/* d o _ P o n g */
/******************************************************************************/
// Responses to a ping are local to the cell and never propagated.
//
const char *XrdCmsNode::do_Pong(XrdCmsRRData &Arg)
{
// Process: pong
// Reponds: n/a
return 0;
}
/******************************************************************************/
/* d o _ P r e p A d d */
/******************************************************************************/
const char *XrdCmsNode::do_PrepAdd(XrdCmsRRData &Arg)
{
EPNAME("do_PrepAdd")
// Do some debugging
//
DEBUGR("parms: " <Queue();
return 0;
}
/******************************************************************************/
/* d o _ P r e p D e l */
/******************************************************************************/
const char *XrdCmsNode::do_PrepDel(XrdCmsRRData &Arg)
{
EPNAME("do_PrepDel")
// Do some debugging
//
DEBUGR("reqid " <Unlink(Arg.Path);
// Return appropriate result
//
return (rc ? fsFail(Arg.Ident, "rm", Arg.Path, rc) : 0);
}
/******************************************************************************/
/* d o _ R m d i r */
/******************************************************************************/
// Rmdir requests are forwarded to all subscribers
//
const char *XrdCmsNode::do_Rmdir(XrdCmsRRData &Arg)
{
EPNAME("do_Rmdir")
static const SMask_t allNodes(~0);
int rc;
// Do some debugging
//
DEBUGR(Arg.Path);
// If we have no data then we should remove this directory from our cache
//
if (!Config.DiskOK)
{XrdCmsSelect Sel(0, Arg.Path, strlen(Arg.Path));
Cache.DelFile(Sel, allNodes);
return 0;
}
// Remove the directory either via call-out or the oss plug-in (we used to
// do this by requeing the request to the local xrootd; no longer needed).
//
if (Config.ProgRD) rc = fsExec(Config.ProgRD, Arg.Path);
else rc = Config.ossFS->Remdir(Arg.Path);
// Return appropriate result
//
return (rc ? fsFail(Arg.Ident, "rmdir", Arg.Path, rc) : 0);
}
/******************************************************************************/
/* d o _ S e l e c t */
/******************************************************************************/
// A select request comes from a redirector and is handled locally within the
// cell. This may cause "state" requests to be broadcast to subscribers.
//
const char *XrdCmsNode::do_Select(XrdCmsRRData &Arg)
{
EPNAME("do_Select")
// kXR_NotFound kXR_IOError kXR_FSError kXR_ServerError
static int rtEC[] = {kYR_ENOENT, kYR_EIO, kYR_FSError, kYR_SrvError};
XrdCmsRRQInfo reqInfo(Instance,RSlot,Arg.Request.streamid,Config.QryMinum);
XrdCmsSelect Sel(XrdCmsSelect::Peers, Arg.Path, Arg.PathLen-1);
struct iovec ioV[2];
char theopts[16], *Avoid, *toP = theopts;
XrdNetIF::ifType ifType;
int rc, bytes;
// Init select data (note that refresh supresses fast redirects)
//
Sel.iovP = 0; Sel.iovN = 0; Sel.InfoP = &reqInfo;
// Determine what interface to return to the client
//
ifType = ifVec[(Arg.Opts & CmsSelectRequest::kYR_retipmsk)
>> CmsSelectRequest::kYR_retipsft];
if (Arg.Opts & CmsSelectRequest::kYR_prvtnet)
{XrdNetIF::Privatize(ifType); *toP++='P';}
Sel.Opts |= static_cast(ifType) & XrdCmsSelect::ifWant;
// Complete the arguments to select
//
if (Arg.Opts & CmsSelectRequest::kYR_refresh)
{Sel.Opts |= XrdCmsSelect::Refresh; *toP++='s';}
if (Arg.Opts & CmsSelectRequest::kYR_online)
{Sel.Opts |= XrdCmsSelect::Online; *toP++='o';}
if (Arg.Opts & CmsSelectRequest::kYR_stat)
{Sel.Opts |= XrdCmsSelect::noBind; *toP++='x';}
else {if (Arg.Opts & CmsSelectRequest::kYR_trunc)
{Sel.Opts |= XrdCmsSelect::Write | XrdCmsSelect::Trunc; *toP++='t';}
if (Arg.Opts & CmsSelectRequest::kYR_write)
{Sel.Opts |= XrdCmsSelect::Write; *toP++='w';
if (Arg.Opts & CmsSelectRequest::kYR_mwfiles || !Config.DoMWChk)
{Sel.Opts |= XrdCmsSelect::MWFiles; *(toP-1)='W';}
}
if (Arg.Opts & CmsSelectRequest::kYR_metaop)
{Sel.Opts |= XrdCmsSelect::Write|XrdCmsSelect::isMeta; *toP++='m';}
if (Arg.Opts & CmsSelectRequest::kYR_create)
{Sel.Opts |= XrdCmsSelect::Write|XrdCmsSelect::NewFile; *toP++='c';
if (Arg.Opts & CmsSelectRequest::kYR_replica)
{Sel.Opts |= XrdCmsSelect::Replica; *toP++='+';}
}
}
*toP = '\0';
// If the client can override selection mode, check if this has been done. Note
// that true packed selection turns off fast redirect.
//
if (Config.sched_Force || !(Arg.Opts & CmsSelectRequest::kYR_aSpec))
{if (Config.sched_Pack)
{Sel.Opts |= XrdCmsSelect::Pack;
if (Config.sched_Pack > 1) Sel.InfoP = 0;
if (!Config.sched_Level) Sel.Opts |= XrdCmsSelect::UseRef;
}
} else {
if (Arg.Opts & CmsSelectRequest::kYR_aPack)
{Sel.Opts |= XrdCmsSelect::Pack;
if (Arg.Opts & CmsSelectRequest::kYR_aWait) Sel.InfoP = 0;
if ((Arg.Opts & CmsSelectRequest::kYR_aPack) ==
CmsSelectRequest::kYR_aStrict)
Sel.Opts |= XrdCmsSelect::UseRef;
}
}
// Check if an avoid node present. If so, this is ineligible for fast redirect.
//
Sel.nmask = SMask_t(0);
if ((Avoid = Arg.Avoid))
{XrdNetAddr avoidAddr;
char *Comma;
DEBUGR(theopts <<' ' < 0)
{Arg.Request.rrCode = kYR_wait;
Sel.Resp.Port = rc;
Sel.Resp.DLen = 0;
DEBUGR("delay " <> CmsSelectRequest::kYR_trySHFT;
Sel.Resp.Port = rtEC[rtRC];
}
DEBUGR("failed; " < " <Send(ioV, 2, bytes+sizeof(Arg.Request));
return 0;
}
/******************************************************************************/
/* d o _ S e l P r e p */
/******************************************************************************/
int XrdCmsNode::do_SelPrep(XrdCmsPrepArgs &Arg) // Static!!!
{
EPNAME("do_SelPrep")
XrdCmsSelect Sel(XrdCmsSelect::Peers, Arg.path, Arg.pathlen-1);
int rc;
// Complete the arguments to select
//
if ( Arg.options & CmsPrepAddRequest::kYR_fresh)
Sel.Opts |= XrdCmsSelect::Freshen;
if ( Arg.options & CmsPrepAddRequest::kYR_write)
Sel.Opts |= XrdCmsSelect::Write;
if (Arg.options & CmsPrepAddRequest::kYR_stage)
{Sel.iovP = Arg.ioV; Sel.iovN = Arg.iovNum;}
else {Sel.iovP = 0; Sel.iovN = 0;
Sel.Opts |= XrdCmsSelect::Defer;
}
// Setup select data (note that prepare does not allow fast redirect)
//
Sel.InfoP = 0; // No fast redirects
Sel.nmask = SMask_t(0);
// We do not care what interface is being used. This may conflict with a
// staging prepare but it's too complicated to handle at this point.
//
Sel.Opts |= static_cast(XrdNetIF::ifAny);
// Check if co-location wanted relevant only when staging wanted
//
if (Arg.clPath && Sel.iovP)
{XrdCmsSelect Scl(XrdCmsSelect::Peers, Arg.clPath, strlen(Arg.clPath));
Scl.iovP = 0; Scl.iovN = 0; Scl.InfoP = 0; Scl.nmask = SMask_t(0);
DEBUGR("colocating " < 0) {Sched->Schedule((XrdJob *)&Arg, rc+time(0));
DEBUGR("coloc to " < 0)
{if (!(Arg.options & CmsPrepAddRequest::kYR_stage)) return 0;
Sched->Schedule((XrdJob *)&Arg, rc+time(0));
DEBUGR("prep delayed " < space
// Respond: avail
//
maxfr = Meter.FreeSpace(tutil);
// Do some debugging
//
DEBUGR(maxfr <<"MB free; " <(blen));
// Send the response
//
if (Arg.Request.rrCode != kYR_space)
XrdCmsManager::Inform(mySpace.Hdr, buff, blen);
else {xmsg[0].iov_base = (char *)&mySpace;
xmsg[0].iov_len = sizeof(mySpace);
xmsg[1].iov_base = buff;
xmsg[1].iov_len = blen;
mySpace.Hdr.datalen = htons(static_cast(blen));
Link->Send(xmsg, 2);
}
return 0;
}
/******************************************************************************/
/* d o _ S t a t e */
/******************************************************************************/
// State requests from a manager are rebroadcast to all relevant subscribers.
//
const char *XrdCmsNode::do_State(XrdCmsRRData &Arg)
{
EPNAME("do_State")
struct iovec xmsg[2];
int rc, noResp = Arg.Request.modifier & CmsStateRequest::kYR_noresp;
// Do some debugging
//
TRACER(Files,Arg.Path);
// Process: state
// Respond: have
//
isKnown = 1;
// If we are a manager then check for the file in the local cache. Otherwise,
// ask the underlying filesystem whether it has the file.
//
if (isMan) {if(!(Arg.Request.modifier = do_StateFWD(Arg))) return 0;}
else if (!Config.DiskOK && !Config.asProxy()) return 0;
else if (baseFS.Limit() && Arg.Request.modifier&CmsStateRequest::kYR_metaman)
{XrdCmsPInfo pinfo;
pinfo.rovec = NodeMask;
if ((rc = baseFS.Exists(Arg,pinfo)) > 0) Arg.Request.modifier = rc;
else return 0;
}
else if ((rc = baseFS.Exists(Arg.Path, -(Arg.PathLen-1))) > 0)
Arg.Request.modifier = rc;
else return 0;
// Respond appropriately
//
if (Arg.Request.modifier && !noResp)
{TRACER(Files,Arg.Path <<" responding have!");
xmsg[0].iov_base = (char *)&Arg.Request;
xmsg[0].iov_len = sizeof(Arg.Request);
xmsg[1].iov_base = Arg.Buff;
xmsg[1].iov_len = Arg.Dlen;
Arg.Request.rrCode = kYR_have;
Arg.Request.modifier |= kYR_raw;
Link->Send(xmsg, 2);
}
return 0;
}
/******************************************************************************/
/* d o _ S t a t e D F S */
/******************************************************************************/
void XrdCmsNode::do_StateDFS(XrdCmsBaseFR *rP, int rc)
{
EPNAME("StateDFs");
static const SMask_t allNodes(~0);
CmsRRHdr Request = {rP->Sid, 0, (kXR_char)(rP->Mod | kYR_raw), 0};
XrdCmsSelect Sel(0, rP->Path, rP->PathLen);
int isNew;
// Do some debugging and record the hash code.
//
DEBUG((rP->Mod & CmsStateRequest::kYR_metaman ? "met " : "man ") <Mod) <Send(ioV, 3, bytes+sizeof(Arg.Request));
return 0;
}
/******************************************************************************/
/* d o _ S t a t s */
/******************************************************************************/
// We punt on stats requests as we have no way to export them anyway.
//
const char *XrdCmsNode::do_Stats(XrdCmsRRData &Arg)
{
static const unsigned short szLen = sizeof(kXR_unt32);
static XrdSysMutex StatsData;
static int statsz = 0;
static int statln = 0;
static char *statbuff = 0;
static time_t statlast = 0;
static kXR_unt32 theSize;
struct iovec ioV[3] = {{(char *)&Arg.Request, sizeof(Arg.Request)},
{(char *)&theSize, sizeof(theSize)},
{0, 0}
};
time_t tNow;
// Allocate buffer if we do not have one
//
StatsData.Lock();
if (!statsz || !statbuff)
{statsz = Cluster.Stats(0,0);
statbuff = (char *)malloc(statsz);
theSize = htonl(statsz);
}
// Check if only the size is wanted
//
if (Arg.Request.modifier & CmsStatsRequest::kYR_size)
{ioV[1].iov_len = sizeof(theSize);
Arg.Request.datalen = htons(szLen);
Arg.Request.rrCode = kYR_data;
Link->Send(ioV, 2);
StatsData.UnLock();
return 0;
}
// Get full statistics if enough time has passed
//
tNow = time(0);
if (statlast+9 >= tNow)
{statln = Cluster.Stats(statbuff, statsz); statlast = tNow;}
// Format result and send response
//
ioV[2].iov_base = statbuff;
ioV[2].iov_len = statln;
Arg.Request.datalen = htons(static_cast(szLen+statln));
Arg.Request.rrCode = kYR_data;
Link->Send(ioV, 3);
// All done
//
StatsData.UnLock();
return 0;
}
/******************************************************************************/
/* d o _ S t a t u s */
/******************************************************************************/
// the reset request is propagated to all of our managers. A special reset case
// is sent when a subscribed supervisor adds a new node. This causes all cache
// lines for the supervisor to be marked suspect. Status change requests are
// propagated to upper-level managers only if the summary state has changed.
//
const char *XrdCmsNode::do_Status(XrdCmsRRData &Arg)
{
EPNAME("do_Status")
const char *srvMsg, *stgMsg;
int Stage = Arg.Request.modifier & CmsStatusRequest::kYR_Stage;
int noStage = Arg.Request.modifier & CmsStatusRequest::kYR_noStage;
int Resume = Arg.Request.modifier & CmsStatusRequest::kYR_Resume;
int Suspend = Arg.Request.modifier & CmsStatusRequest::kYR_Suspend;
int Reset = Arg.Request.modifier & CmsStatusRequest::kYR_Reset;
int add2Activ, add2Stage, port;
// Do some debugging
//
DEBUGR( (Reset ? "reset " : "")
<<(Resume ? "resume " : (Suspend ? "suspend " : ""))
<<(Stage ? "stage " : (noStage ? "nostage " : "")));
// Process reset requests. These are exclsuive to any other request
//
if (Reset)
{XrdCmsManager::Reset(); // Propagate the reset to our managers
Cache.Bounce(NodeMask, NodeID); // Now invalidate our cache lines
}
// Process stage/nostage
//
if ((Stage && isNoStage) || (noStage && !isNoStage))
if (noStage) {add2Stage = -1; isNoStage = 1; stgMsg="staging suspended";}
else {add2Stage = 1; isNoStage = 0; stgMsg="staging resumed";}
else {add2Stage = 0; stgMsg = 0;}
// Process suspend/resume
//
if ((Resume && (isBad & isSuspend)) || (Suspend && !(isBad & isSuspend)))
if (Suspend) {add2Activ = -1; isBad |= isSuspend;
srvMsg="service suspended";
stgMsg = 0;
}
else {add2Activ = 1; isBad &= ~isSuspend;
srvMsg="service resumed";
stgMsg = (isNoStage ? "(no staging)" : "(staging)");
port = ntohl(Arg.Request.streamid);
if (port && port != netIF.Port())
{Lock(false); netIF.Port(port); UnLock();
DEBUGR("set data port to " <Truncate(Arg.Path, Size);
// Return appropriate result
//
return (rc ? fsFail(Arg.Ident, "trunc", Arg.Path, rc) : 0);
}
/******************************************************************************/
/* d o _ T r y */
/******************************************************************************/
// Try requests from a manager indicate that we are being displaced and should
// hunt for another manager. The request provides hints as to where to try.
// Note that this method is no longer called but handled in XrdCmsProtocol!
//
const char *XrdCmsNode::do_Try(XrdCmsRRData &Arg)
{
EPNAME("do_Try")
// Do somde debugging
//
DEBUGR(Arg.Path);
// Add all the alternates to our alternate list
//
if (Manager) Manager->myMans->Add(&netID, Arg.Path, Config.PortTCP, myLevel);
// Close the link and return an error
//
// Disc("redirected.");
return ".redirected";
}
/******************************************************************************/
/* d o _ U p d a t e */
/******************************************************************************/
const char *XrdCmsNode::do_Update(XrdCmsRRData &Arg)
{
// Process: update
// Respond: status
//
CmsState.sendState(Link);
return 0;
}
/******************************************************************************/
/* d o _ U s a g e */
/******************************************************************************/
// Usage requests from a manager are local to the cell and never propagated.
//
const char *XrdCmsNode::do_Usage(XrdCmsRRData &Arg)
{
// Process: usage
// Respond: load
//
Report_Usage(Link);
return 0;
}
/******************************************************************************/
/* R e p o r t _ U s a g e */
/******************************************************************************/
void XrdCmsNode::Report_Usage(XrdLink *lp) // Static!
{
EPNAME("Report_Usage")
CmsLoadRequest myLoad = {{0, kYR_load, 0, 0}};
struct iovec xmsg[2];
char loadbuff[CmsLoadRequest::numLoad];
char respbuff[sizeof(loadbuff)+2+sizeof(int)+2], *bp = respbuff;
int blen, maxfr, pcpu, pnet, pxeq, pmem, ppag, pdsk;
// Respond: load
//
maxfr = Meter.Report(pcpu, pnet, pxeq, pmem, ppag, pdsk);
loadbuff[CmsLoadRequest::cpuLoad] = static_cast(pcpu);
loadbuff[CmsLoadRequest::netLoad] = static_cast(pnet);
loadbuff[CmsLoadRequest::xeqLoad] = static_cast(pxeq);
loadbuff[CmsLoadRequest::memLoad] = static_cast(pmem);
loadbuff[CmsLoadRequest::pagLoad] = static_cast(ppag);
loadbuff[CmsLoadRequest::dskLoad] = static_cast(pdsk);
blen = XrdOucPup::Pack(&bp, loadbuff, sizeof(loadbuff));
blen += XrdOucPup::Pack(&bp, maxfr);
myLoad.Hdr.datalen = htons(static_cast(blen));
xmsg[0].iov_base = (char *)&myLoad;
xmsg[0].iov_len = sizeof(myLoad);
xmsg[1].iov_base = respbuff;
xmsg[1].iov_len = blen;
if (lp) lp->Send(xmsg, 2);
else XrdCmsManager::Inform("usage", xmsg, 2);
// Do some debugging
//
DEBUG("cpu=" <> 9)) return 0;
return 1;
}
/******************************************************************************/
/* g e t S i z e */
/******************************************************************************/
int XrdCmsNode::getSize(const char *theSize, long long &Size)
{
char *eP;
// Convert the size argument
//
if (!(Size = strtoll(theSize, &eP, 10)) || *eP) return 0;
return 1;
}