/***********************************************************************************************/ /* */ /* X r d C m s N o d e . c c */ /* */ /* (c) 2007 by the Board of Trustees of the Leland Stanford, Jr., University */ /* All Rights Reserved */ /* Produced by Andrew Hanushevsky for Stanford University under contract */ /* DE-AC02-76-SFO0515 with the Department of Energy */ /* */ /* This file is part of the XRootD software suite. */ /* */ /* XRootD is free software: you can redistribute it and/or modify it under */ /* the terms of the GNU Lesser General Public License as published by the */ /* Free Software Foundation, either version 3 of the License, or (at your */ /* option) any later version. */ /* */ /* XRootD is distributed in the hope that it will be useful, but WITHOUT */ /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */ /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */ /* License for more details. */ /* */ /* You should have received a copy of the GNU Lesser General Public License */ /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */ /* COPYING (GPL license). If not, see . */ /* */ /* The copyright holder's institutional names and contributor's names may not */ /* be used to endorse or promote products derived from this software without */ /* specific prior written permission of the institution or contributor. */ /******************************************************************************/ #include #include #include #include #include #include #include #include "Xrd/XrdJob.hh" #include "Xrd/XrdLink.hh" #include "XProtocol/YProtocol.hh" #include "XrdCms/XrdCmsBaseFS.hh" #include "XrdCms/XrdCmsCache.hh" #include "XrdCms/XrdCmsCluster.hh" #include "XrdCms/XrdCmsClustID.hh" #include "XrdCms/XrdCmsConfig.hh" #include "XrdCms/XrdCmsManager.hh" #include "XrdCms/XrdCmsManList.hh" #include "XrdCms/XrdCmsMeter.hh" #include "XrdCms/XrdCmsPList.hh" #include "XrdCms/XrdCmsPrepare.hh" #include "XrdCms/XrdCmsRRData.hh" #include "XrdCms/XrdCmsNode.hh" #include "XrdCms/XrdCmsSelect.hh" #include "XrdCms/XrdCmsState.hh" #include "XrdCms/XrdCmsTrace.hh" #include "XrdOss/XrdOss.hh" #include "XrdOuc/XrdOucName2Name.hh" #include "XrdOuc/XrdOucProg.hh" #include "XrdOuc/XrdOucPup.hh" #include "XrdOuc/XrdOucUtils.hh" #include "XrdNet/XrdNetUtils.hh" #include "XrdSys/XrdSysPlatform.hh" using namespace XrdCms; /******************************************************************************/ /* S t a t i c O b j e c t s */ /******************************************************************************/ XrdSysMutex XrdCmsNode::mlMutex; int XrdCmsNode::LastFree = 0; namespace { XrdNetIF::ifType ifVec[4] = {XrdNetIF::PublicV4, XrdNetIF::Public46, XrdNetIF::PublicV6, XrdNetIF::Public64}; }; /******************************************************************************/ /* C o n s t r u c t o r */ /******************************************************************************/ XrdCmsNode::XrdCmsNode(XrdLink *lnkp, const char *theIF, const char *nid, int port, int lvl, int id) : nodeMutex(0, "nodeCV") { static XrdSysMutex iMutex; static const SMask_t smask_1(1); static int iNum = 1; Link = lnkp; NodeMask = (id < 0 ? 0 : smask_1 << id); NodeID = id; cidP = 0; hasNet = 0; isBad = 0; isOffline= (lnkp == 0); isNoStage= 0; isBound = 0; isConn = 0; isGone = 0; isPerm = 0; isMan = 0; isKnown = 0; isPeer = 0; incUL = 0; myCost = 0; myLoad = 0; myMass = 0; DiskTotal= 0; DiskFree = 0; DiskMinF = 0; DiskNums = 0; DiskUtil = 0; Next = 0; RefW = 0; RefTotW = 0; RefR = 0; RefTotR = 0; Share = 0; Shrem = 0; Shrin = 0; logload = Config.LogPerf; DropTime = 0; DropJob = 0; myName = 0; myNlen = 0; Ident = 0; myNID = strdup(nid ? nid : "?"); if ((myCID = index(myNID, ' '))) myCID++; else myCID = myNID; myLevel = lvl; ConfigID = 0; TZValid = 0; TimeZone = 0; subsPort = 0; myVersion= kYR_Version; lkCount = 0; ulCount = 0; Manager = 0; // setName() will set the node identification information // setName(lnkp, theIF, (nid ? port : 0)); iMutex.Lock(); Instance = iNum++; iMutex.UnLock(); } /******************************************************************************/ /* D e s t r u c t o r */ /******************************************************************************/ XrdCmsNode::~XrdCmsNode() { isOffline = 1; // STMutex not needed here if (isLocked) UnLock(); // Delete other appendages // if (cidP) {cidP->RemNode(this); cidP = 0;} if (Ident) free(Ident); if (myNID) free(myNID); if (myName)free(myName); } /******************************************************************************/ /* s e t N a m e */ /******************************************************************************/ void XrdCmsNode::setName(XrdLink *lnkp, const char *theIF, int port) { char buff[512]; const char *hname = lnkp->Host(); // Check if this is a duplicate. Note that we check for strict equivalence. // if (myName) {if (!strcmp(myName,hname) && port == netIF.Port() && netID.Same(lnkp->NetAddr())) return; free(myName); } // Get our address information but substitute data port for actual port // netID = *(lnkp->NetAddr()); // Set the network interface. Note that out of domain nodes are not allowed // to specify interface addresses as this does not make global sense. // if (theIF && !netIF.InDomain(&netID)) theIF = 0; netIF.SetIF(&netID, theIF, port); hasNet = netIF.Mask(); // Construct our identification // myName = strdup(hname); myNlen = strlen(hname); if (!port) strcpy(buff, lnkp->ID); else sprintf(buff, "%s:%d", lnkp->ID, port); if (Ident) free(Ident); Ident = strdup(buff); } /******************************************************************************/ /* D e l e t e */ /******************************************************************************/ void XrdCmsNode::Delete(XrdSysMutex &gMutex) { EPNAME("Delete"); time_t tNow; unsigned int theLKCnt; int tmoWait = 60, totWait = 0; bool doDel = true; // We need to make sure there are no references to this object. This is true // when the lkCount equals the ulCount. The lkCount is under control of the // global mutex passed to us. The ulCount is under control of the node lock. // we will wait until they are equal. As this node has been removed from all // tables at this point, the lkCount cannot increase but it may decrease when // Ref2G() is called which happens for none lock-free operations (e.g. Send). // However, we will refresh it if we timeout. // gMutex.Lock(); theLKCnt = lkCount; gMutex.UnLock(); // Get the node lock and do some debugging. Set tghe isGone flag even though it // should be set. We need to do that under the node lock to make sure we get // signalled whenever the node gets unlocked by some thread (ulCount changed). // nodeMutex.Lock(); isGone = 1; DEBUG(Ident <<" locks=" <setEtext(reason); Link->Close(1); isConn = 0; } // Unlock ourselves if we locked ourselves // if (needLock) nodeMutex.UnLock(); } /******************************************************************************/ /* d o _ A v a i l */ /******************************************************************************/ // Node responses to space usage requests from a manager are localized to the // cell and need not be propopagated in any direction. // const char *XrdCmsNode::do_Avail(XrdCmsRRData &Arg) { EPNAME("do_Avail") // Process: avail // DiskFree = Arg.dskFree; DiskUtil = static_cast(Arg.dskUtil); // Do some debugging // DEBUGR(DiskFree <<"MB free; " <Chmod(Arg.Path, mode); // Return appropriate result // return (rc ? fsFail(Arg.Ident, "chmod", Arg.Path, rc) : 0); } /******************************************************************************/ /* d o _ D i s c */ /******************************************************************************/ // When a manager receives a disc response from a node it sends a disc request // and then closes the connection. // When a node receives a disc request it simply closes the connection. const char *XrdCmsNode::do_Disc(XrdCmsRRData &Arg) { // Indicate we have received a disconnect // Say.Emsg("Node", Link->Name(), "requested a disconnect"); // If we must send a disc request, do so now // if (Config.asManager()) Link->Send((char *)&Arg.Request,sizeof(Arg.Request)); // Close the link and return an error // isOffline = 1; // STMutex not needed here Link->Close(1); return "."; // Signal disconnect } /******************************************************************************/ /* d o _ G o n e */ /******************************************************************************/ // When a manager receives a gone request it is propogated if we are subscribed // and we have not sent a gone request in the immediate past. // const char *XrdCmsNode::do_Gone(XrdCmsRRData &Arg) { EPNAME("do_Gone") static const SMask_t allNodes(~0); int newgone; // Do some debugging // TRACER(Files,Arg.Path); // Update path information and delete this from the prep queue if we are a // staging node. We can also be called via the admin end-point interface // In this case, we have no cache and simply forward up the request. // if (Config.asManager()) {XrdCmsSelect Sel(XrdCmsSelect::Advisory, Arg.Path, Arg.PathLen-1); newgone = Cache.DelFile(Sel, baseFS.isDFS() ? allNodes : NodeMask); } else { newgone = 1; if (Config.DiskSS) PrepQ.Gone(Arg.Path); } // If we have no managers and we still have the file or never had it, return // if (!XrdCmsManager::Present() || !newgone) return 0; // Back-propogate the gone to all of our managers // XrdCmsManager::Inform(Arg.Request, Arg.Buff, Arg.Dlen); // All done // return 0; } /******************************************************************************/ /* d o _ H a v e */ /******************************************************************************/ // When a manager receives a have request it is propogated if we are subscribed // and we have not sent a have request in the immediate past. // const char *XrdCmsNode::do_Have(XrdCmsRRData &Arg) { EPNAME("do_Have") static const SMask_t allNodes(~0); XrdCmsPInfo pinfo; int isnew, Opts; // Do some debugging // TRACER(Files, (Arg.Request.modifier&CmsHaveRequest::Pending ? "P ":"") < // 0 1 2 3 4 5 6 pcpu = static_cast(Arg.Opaque[CmsLoadRequest::cpuLoad]); pnet = static_cast(Arg.Opaque[CmsLoadRequest::netLoad]); pxeq = static_cast(Arg.Opaque[CmsLoadRequest::xeqLoad]); pmem = static_cast(Arg.Opaque[CmsLoadRequest::memLoad]); ppag = static_cast(Arg.Opaque[CmsLoadRequest::pagLoad]); pdsk = static_cast(Arg.Opaque[CmsLoadRequest::dskLoad]); // Compute actual load value // myLoad = Meter.calcLoad(pcpu, pnet, pxeq, pmem, ppag); myMass = Meter.calcLoad(myLoad, pdsk); DiskFree = Arg.dskFree; DiskUtil = pdsk; // Do some debugging // DEBUGR("cpu=" <> CmsLocateRequest::kYR_retipsft]; // Indicate whether we want a name or an actual address // lsopts = (Arg.Opts & CmsLocateRequest::kYR_retname ? XrdCmsCluster::LS_IDNT : XrdCmsCluster::LS_IPO); // Indicate if only a single server entry should be listed // if (Arg.Opts & CmsLocateRequest::kYR_retuniq && baseFS.isDFS()) {lsuniq = true; *toP++='u';} // Indicate whether we can ignore network restrictions // if (Arg.Opts & CmsLocateRequest::kYR_listall) lsopts |= XrdCmsCluster::LS_ANY; // Indicate whether we ony want to list a single entry // // Handle private networks here // if (Arg.Opts & CmsLocateRequest::kYR_prvtnet) {XrdNetIF::Privatize(ifType); *toP++='P'; } // Encode if type into the options // Sel.Opts = static_cast(ifType) & XrdCmsSelect::ifWant; lsopts = static_cast(lsopts | ifType); // Grab various options // if (Arg.Opts & CmsLocateRequest::kYR_refresh) {Sel.Opts = XrdCmsSelect::Refresh; *toP++='s';} if (Arg.Opts & CmsLocateRequest::kYR_asap) {Sel.Opts |= XrdCmsSelect::Asap; *toP++='i'; Sel.InfoP = &reqInfo; reqInfo.lsLU = static_cast(lsopts); } else Sel.InfoP = 0; // Do some debugging // *toP = '\0'; DEBUGR(theopts <<' ' < 0) {Arg.Request.rrCode = kYR_wait; bytes = sizeof(Resp.Val); Why = "delay "; } else { if (rc == -2) return 0; Arg.Request.rrCode = kYR_error; rc = kYR_ENOENT; Why = "miss "; bytes = strlcpy(Resp.outbuff, "No servers have access to the file", sizeof(Resp.outbuff)) + sizeof(Resp.Val) + 1; } } else {Why = "?"; bytes = 0;} // List the servers // if (!rc) {if (!Sel.Vec.hf || !(sP=Cluster.List(Sel.Vec.hf, lsopts, oksel))) {const char *eTxt; Arg.Request.rrCode = kYR_error; if (oksel) {rc = kYR_ENETUNREACH; Why = "unreachable "; sprintf(eBuff, "No servers are reachable via %s network", XrdNetIF::Name(ifType)); eTxt = eBuff; } else { rc = kYR_ENOENT; Why = "none "; eTxt = "No servers have the file"; } bytes = strlcpy(Resp.outbuff, eTxt, sizeof(Resp.outbuff)) + sizeof(Resp.Val) + 1; } else rc = 0; } // Either prepare to send an error or format the result // if (rc) {Resp.Val = htonl(rc); DEBUGR(Why <Send(ioV, 2, bytes+sizeof(Arg.Request)); return 0; } /******************************************************************************/ /* Static d o _ L o c F m t */ /******************************************************************************/ int XrdCmsNode::do_LocFmt(char *buff, XrdCmsSelected *sP, SMask_t pfVec, SMask_t wfVec, bool lsall, bool lsuniq) { static const int Skip = (XrdCmsSelected::Disable | XrdCmsSelected::Offline); static const int Hung = (XrdCmsSelected::Disable | XrdCmsSelected::Offline | XrdCmsSelected::Suspend); XrdCmsSelected *pP; char *oP = buff; // If only unique entries are wanted then we need to only let through // all non-servers and one server (prefereably a r/w one) // if (!lsall && lsuniq) {XrdCmsSelected *xP = 0; bool haverw = false; pP = sP; while(pP) {if (!(pP->Status & (XrdCmsSelected::isMangr | Skip))) {if (haverw) pP->Status |= Skip; else {if (xP) xP->Status |= Skip; xP = pP; haverw = (pP->Mask & wfVec) != 0; } } pP = pP->next; } } // format out the request as follows: // 01234567810123456789212345678 // xy[::123.123.123.123]:123456 // if (lsall) while(sP) {*oP = (sP->Status & XrdCmsSelected::isMangr ? 'M' : 'S'); if (sP->Status & Hung) *oP = tolower(*oP); *(oP+1) = (sP->Mask & wfVec ? 'w' : 'r'); strcpy(oP+2, sP->Ident); oP += sP->IdentLen + 2; if (sP->next) *oP++ = ' '; pP = sP; sP = sP->next; delete pP; } else while(sP) {if (!(sP->Status & Skip)) {*oP = (sP->Status & XrdCmsSelected::isMangr ? 'M' : 'S'); if (sP->Mask & pfVec) *oP = tolower(*oP); *(oP+1) = (sP->Mask & wfVec ? 'w' : 'r'); strcpy(oP+2, sP->Ident); oP += sP->IdentLen + 2; if (sP->next) *oP++ = ' '; } pP = sP; sP = sP->next; delete pP; } // Send of the result // *oP = '\0'; return (oP - buff); } /******************************************************************************/ /* d o _ M k d i r */ /******************************************************************************/ // Mkdir requests are forwarded to all subscribers // const char *XrdCmsNode::do_Mkdir(XrdCmsRRData &Arg) { EPNAME("do_Mkdir") mode_t mode = 0; int rc; // Do some debugging // DEBUGR("mode " <Mkdir(Arg.Path, mode); // Return appropriate result // return (rc ? fsFail(Arg.Ident, "mkdir", Arg.Path, rc) : 0); } /******************************************************************************/ /* d o _ M k p a t h */ /******************************************************************************/ // Mkpath requests are forwarded to all subscribers // const char *XrdCmsNode::do_Mkpath(XrdCmsRRData &Arg) { EPNAME("do_Mkpath") mode_t mode = 0; int rc; // Do some debugging // DEBUGR("mode " <Mkdir(Arg.Path, mode, 1); // Return appropriate result // return (rc ? fsFail(Arg.Ident, "mkpath", Arg.Path, rc) : 0); } /******************************************************************************/ /* d o _ M v */ /******************************************************************************/ // Mv requests are forwarded to all subscribers // const char *XrdCmsNode::do_Mv(XrdCmsRRData &Arg) { EPNAME("do_Mv") static const SMask_t allNodes(~0); int rc; // Do some debugging // DEBUGR(Arg.Path <<" to " < 0) {Arg.waitVal = rc; return "!mv";} else if (Sel2.Vec.hf) {Say.Emsg("do_Mv",Arg.Path2,"exists; mv failed for",Arg.Path); return "target file exists"; } } Cache.DelFile(Sel2, allNodes); Cache.DelFile(Sel1, allNodes); return 0; } // Rename the file via call-out or oss plug-in (we used to do this via a requeue // to the local xrootd but it's no longer necessary). // if (Config.ProgMV) rc = fsExec(Config.ProgMV, Arg.Path, Arg.Path2); else rc = Config.ossFS->Rename(Arg.Path, Arg.Path2); // Return appropriate result // return (rc ? fsFail(Arg.Ident, "mv", Arg.Path, rc) : 0); } /******************************************************************************/ /* d o _ P i n g */ /******************************************************************************/ // Ping requests from a manager are local to the cell and never propagated. // const char *XrdCmsNode::do_Ping(XrdCmsRRData &Arg) { static CmsPongRequest pongIt = {{0, kYR_pong, 0, 0}}; // Process: ping // Respond: pong // if (isBad & isDoomed) return ".redirected"; Link->Send((char *)&pongIt, sizeof(pongIt)); return 0; } /******************************************************************************/ /* d o _ P o n g */ /******************************************************************************/ // Responses to a ping are local to the cell and never propagated. // const char *XrdCmsNode::do_Pong(XrdCmsRRData &Arg) { // Process: pong // Reponds: n/a return 0; } /******************************************************************************/ /* d o _ P r e p A d d */ /******************************************************************************/ const char *XrdCmsNode::do_PrepAdd(XrdCmsRRData &Arg) { EPNAME("do_PrepAdd") // Do some debugging // DEBUGR("parms: " <Queue(); return 0; } /******************************************************************************/ /* d o _ P r e p D e l */ /******************************************************************************/ const char *XrdCmsNode::do_PrepDel(XrdCmsRRData &Arg) { EPNAME("do_PrepDel") // Do some debugging // DEBUGR("reqid " <Unlink(Arg.Path); // Return appropriate result // return (rc ? fsFail(Arg.Ident, "rm", Arg.Path, rc) : 0); } /******************************************************************************/ /* d o _ R m d i r */ /******************************************************************************/ // Rmdir requests are forwarded to all subscribers // const char *XrdCmsNode::do_Rmdir(XrdCmsRRData &Arg) { EPNAME("do_Rmdir") static const SMask_t allNodes(~0); int rc; // Do some debugging // DEBUGR(Arg.Path); // If we have no data then we should remove this directory from our cache // if (!Config.DiskOK) {XrdCmsSelect Sel(0, Arg.Path, strlen(Arg.Path)); Cache.DelFile(Sel, allNodes); return 0; } // Remove the directory either via call-out or the oss plug-in (we used to // do this by requeing the request to the local xrootd; no longer needed). // if (Config.ProgRD) rc = fsExec(Config.ProgRD, Arg.Path); else rc = Config.ossFS->Remdir(Arg.Path); // Return appropriate result // return (rc ? fsFail(Arg.Ident, "rmdir", Arg.Path, rc) : 0); } /******************************************************************************/ /* d o _ S e l e c t */ /******************************************************************************/ // A select request comes from a redirector and is handled locally within the // cell. This may cause "state" requests to be broadcast to subscribers. // const char *XrdCmsNode::do_Select(XrdCmsRRData &Arg) { EPNAME("do_Select") // kXR_NotFound kXR_IOError kXR_FSError kXR_ServerError static int rtEC[] = {kYR_ENOENT, kYR_EIO, kYR_FSError, kYR_SrvError}; XrdCmsRRQInfo reqInfo(Instance,RSlot,Arg.Request.streamid,Config.QryMinum); XrdCmsSelect Sel(XrdCmsSelect::Peers, Arg.Path, Arg.PathLen-1); struct iovec ioV[2]; char theopts[16], *Avoid, *toP = theopts; XrdNetIF::ifType ifType; int rc, bytes; // Init select data (note that refresh supresses fast redirects) // Sel.iovP = 0; Sel.iovN = 0; Sel.InfoP = &reqInfo; // Determine what interface to return to the client // ifType = ifVec[(Arg.Opts & CmsSelectRequest::kYR_retipmsk) >> CmsSelectRequest::kYR_retipsft]; if (Arg.Opts & CmsSelectRequest::kYR_prvtnet) {XrdNetIF::Privatize(ifType); *toP++='P';} Sel.Opts |= static_cast(ifType) & XrdCmsSelect::ifWant; // Complete the arguments to select // if (Arg.Opts & CmsSelectRequest::kYR_refresh) {Sel.Opts |= XrdCmsSelect::Refresh; *toP++='s';} if (Arg.Opts & CmsSelectRequest::kYR_online) {Sel.Opts |= XrdCmsSelect::Online; *toP++='o';} if (Arg.Opts & CmsSelectRequest::kYR_stat) {Sel.Opts |= XrdCmsSelect::noBind; *toP++='x';} else {if (Arg.Opts & CmsSelectRequest::kYR_trunc) {Sel.Opts |= XrdCmsSelect::Write | XrdCmsSelect::Trunc; *toP++='t';} if (Arg.Opts & CmsSelectRequest::kYR_write) {Sel.Opts |= XrdCmsSelect::Write; *toP++='w'; if (Arg.Opts & CmsSelectRequest::kYR_mwfiles || !Config.DoMWChk) {Sel.Opts |= XrdCmsSelect::MWFiles; *(toP-1)='W';} } if (Arg.Opts & CmsSelectRequest::kYR_metaop) {Sel.Opts |= XrdCmsSelect::Write|XrdCmsSelect::isMeta; *toP++='m';} if (Arg.Opts & CmsSelectRequest::kYR_create) {Sel.Opts |= XrdCmsSelect::Write|XrdCmsSelect::NewFile; *toP++='c'; if (Arg.Opts & CmsSelectRequest::kYR_replica) {Sel.Opts |= XrdCmsSelect::Replica; *toP++='+';} } } *toP = '\0'; // If the client can override selection mode, check if this has been done. Note // that true packed selection turns off fast redirect. // if (Config.sched_Force || !(Arg.Opts & CmsSelectRequest::kYR_aSpec)) {if (Config.sched_Pack) {Sel.Opts |= XrdCmsSelect::Pack; if (Config.sched_Pack > 1) Sel.InfoP = 0; if (!Config.sched_Level) Sel.Opts |= XrdCmsSelect::UseRef; } } else { if (Arg.Opts & CmsSelectRequest::kYR_aPack) {Sel.Opts |= XrdCmsSelect::Pack; if (Arg.Opts & CmsSelectRequest::kYR_aWait) Sel.InfoP = 0; if ((Arg.Opts & CmsSelectRequest::kYR_aPack) == CmsSelectRequest::kYR_aStrict) Sel.Opts |= XrdCmsSelect::UseRef; } } // Check if an avoid node present. If so, this is ineligible for fast redirect. // Sel.nmask = SMask_t(0); if ((Avoid = Arg.Avoid)) {XrdNetAddr avoidAddr; char *Comma; DEBUGR(theopts <<' ' < 0) {Arg.Request.rrCode = kYR_wait; Sel.Resp.Port = rc; Sel.Resp.DLen = 0; DEBUGR("delay " <> CmsSelectRequest::kYR_trySHFT; Sel.Resp.Port = rtEC[rtRC]; } DEBUGR("failed; " < " <Send(ioV, 2, bytes+sizeof(Arg.Request)); return 0; } /******************************************************************************/ /* d o _ S e l P r e p */ /******************************************************************************/ int XrdCmsNode::do_SelPrep(XrdCmsPrepArgs &Arg) // Static!!! { EPNAME("do_SelPrep") XrdCmsSelect Sel(XrdCmsSelect::Peers, Arg.path, Arg.pathlen-1); int rc; // Complete the arguments to select // if ( Arg.options & CmsPrepAddRequest::kYR_fresh) Sel.Opts |= XrdCmsSelect::Freshen; if ( Arg.options & CmsPrepAddRequest::kYR_write) Sel.Opts |= XrdCmsSelect::Write; if (Arg.options & CmsPrepAddRequest::kYR_stage) {Sel.iovP = Arg.ioV; Sel.iovN = Arg.iovNum;} else {Sel.iovP = 0; Sel.iovN = 0; Sel.Opts |= XrdCmsSelect::Defer; } // Setup select data (note that prepare does not allow fast redirect) // Sel.InfoP = 0; // No fast redirects Sel.nmask = SMask_t(0); // We do not care what interface is being used. This may conflict with a // staging prepare but it's too complicated to handle at this point. // Sel.Opts |= static_cast(XrdNetIF::ifAny); // Check if co-location wanted relevant only when staging wanted // if (Arg.clPath && Sel.iovP) {XrdCmsSelect Scl(XrdCmsSelect::Peers, Arg.clPath, strlen(Arg.clPath)); Scl.iovP = 0; Scl.iovN = 0; Scl.InfoP = 0; Scl.nmask = SMask_t(0); DEBUGR("colocating " < 0) {Sched->Schedule((XrdJob *)&Arg, rc+time(0)); DEBUGR("coloc to " < 0) {if (!(Arg.options & CmsPrepAddRequest::kYR_stage)) return 0; Sched->Schedule((XrdJob *)&Arg, rc+time(0)); DEBUGR("prep delayed " < space // Respond: avail // maxfr = Meter.FreeSpace(tutil); // Do some debugging // DEBUGR(maxfr <<"MB free; " <(blen)); // Send the response // if (Arg.Request.rrCode != kYR_space) XrdCmsManager::Inform(mySpace.Hdr, buff, blen); else {xmsg[0].iov_base = (char *)&mySpace; xmsg[0].iov_len = sizeof(mySpace); xmsg[1].iov_base = buff; xmsg[1].iov_len = blen; mySpace.Hdr.datalen = htons(static_cast(blen)); Link->Send(xmsg, 2); } return 0; } /******************************************************************************/ /* d o _ S t a t e */ /******************************************************************************/ // State requests from a manager are rebroadcast to all relevant subscribers. // const char *XrdCmsNode::do_State(XrdCmsRRData &Arg) { EPNAME("do_State") struct iovec xmsg[2]; int rc, noResp = Arg.Request.modifier & CmsStateRequest::kYR_noresp; // Do some debugging // TRACER(Files,Arg.Path); // Process: state // Respond: have // isKnown = 1; // If we are a manager then check for the file in the local cache. Otherwise, // ask the underlying filesystem whether it has the file. // if (isMan) {if(!(Arg.Request.modifier = do_StateFWD(Arg))) return 0;} else if (!Config.DiskOK && !Config.asProxy()) return 0; else if (baseFS.Limit() && Arg.Request.modifier&CmsStateRequest::kYR_metaman) {XrdCmsPInfo pinfo; pinfo.rovec = NodeMask; if ((rc = baseFS.Exists(Arg,pinfo)) > 0) Arg.Request.modifier = rc; else return 0; } else if ((rc = baseFS.Exists(Arg.Path, -(Arg.PathLen-1))) > 0) Arg.Request.modifier = rc; else return 0; // Respond appropriately // if (Arg.Request.modifier && !noResp) {TRACER(Files,Arg.Path <<" responding have!"); xmsg[0].iov_base = (char *)&Arg.Request; xmsg[0].iov_len = sizeof(Arg.Request); xmsg[1].iov_base = Arg.Buff; xmsg[1].iov_len = Arg.Dlen; Arg.Request.rrCode = kYR_have; Arg.Request.modifier |= kYR_raw; Link->Send(xmsg, 2); } return 0; } /******************************************************************************/ /* d o _ S t a t e D F S */ /******************************************************************************/ void XrdCmsNode::do_StateDFS(XrdCmsBaseFR *rP, int rc) { EPNAME("StateDFs"); static const SMask_t allNodes(~0); CmsRRHdr Request = {rP->Sid, 0, (kXR_char)(rP->Mod | kYR_raw), 0}; XrdCmsSelect Sel(0, rP->Path, rP->PathLen); int isNew; // Do some debugging and record the hash code. // DEBUG((rP->Mod & CmsStateRequest::kYR_metaman ? "met " : "man ") <Mod) <Send(ioV, 3, bytes+sizeof(Arg.Request)); return 0; } /******************************************************************************/ /* d o _ S t a t s */ /******************************************************************************/ // We punt on stats requests as we have no way to export them anyway. // const char *XrdCmsNode::do_Stats(XrdCmsRRData &Arg) { static const unsigned short szLen = sizeof(kXR_unt32); static XrdSysMutex StatsData; static int statsz = 0; static int statln = 0; static char *statbuff = 0; static time_t statlast = 0; static kXR_unt32 theSize; struct iovec ioV[3] = {{(char *)&Arg.Request, sizeof(Arg.Request)}, {(char *)&theSize, sizeof(theSize)}, {0, 0} }; time_t tNow; // Allocate buffer if we do not have one // StatsData.Lock(); if (!statsz || !statbuff) {statsz = Cluster.Stats(0,0); statbuff = (char *)malloc(statsz); theSize = htonl(statsz); } // Check if only the size is wanted // if (Arg.Request.modifier & CmsStatsRequest::kYR_size) {ioV[1].iov_len = sizeof(theSize); Arg.Request.datalen = htons(szLen); Arg.Request.rrCode = kYR_data; Link->Send(ioV, 2); StatsData.UnLock(); return 0; } // Get full statistics if enough time has passed // tNow = time(0); if (statlast+9 >= tNow) {statln = Cluster.Stats(statbuff, statsz); statlast = tNow;} // Format result and send response // ioV[2].iov_base = statbuff; ioV[2].iov_len = statln; Arg.Request.datalen = htons(static_cast(szLen+statln)); Arg.Request.rrCode = kYR_data; Link->Send(ioV, 3); // All done // StatsData.UnLock(); return 0; } /******************************************************************************/ /* d o _ S t a t u s */ /******************************************************************************/ // the reset request is propagated to all of our managers. A special reset case // is sent when a subscribed supervisor adds a new node. This causes all cache // lines for the supervisor to be marked suspect. Status change requests are // propagated to upper-level managers only if the summary state has changed. // const char *XrdCmsNode::do_Status(XrdCmsRRData &Arg) { EPNAME("do_Status") const char *srvMsg, *stgMsg; int Stage = Arg.Request.modifier & CmsStatusRequest::kYR_Stage; int noStage = Arg.Request.modifier & CmsStatusRequest::kYR_noStage; int Resume = Arg.Request.modifier & CmsStatusRequest::kYR_Resume; int Suspend = Arg.Request.modifier & CmsStatusRequest::kYR_Suspend; int Reset = Arg.Request.modifier & CmsStatusRequest::kYR_Reset; int add2Activ, add2Stage, port; // Do some debugging // DEBUGR( (Reset ? "reset " : "") <<(Resume ? "resume " : (Suspend ? "suspend " : "")) <<(Stage ? "stage " : (noStage ? "nostage " : ""))); // Process reset requests. These are exclsuive to any other request // if (Reset) {XrdCmsManager::Reset(); // Propagate the reset to our managers Cache.Bounce(NodeMask, NodeID); // Now invalidate our cache lines } // Process stage/nostage // if ((Stage && isNoStage) || (noStage && !isNoStage)) if (noStage) {add2Stage = -1; isNoStage = 1; stgMsg="staging suspended";} else {add2Stage = 1; isNoStage = 0; stgMsg="staging resumed";} else {add2Stage = 0; stgMsg = 0;} // Process suspend/resume // if ((Resume && (isBad & isSuspend)) || (Suspend && !(isBad & isSuspend))) if (Suspend) {add2Activ = -1; isBad |= isSuspend; srvMsg="service suspended"; stgMsg = 0; } else {add2Activ = 1; isBad &= ~isSuspend; srvMsg="service resumed"; stgMsg = (isNoStage ? "(no staging)" : "(staging)"); port = ntohl(Arg.Request.streamid); if (port && port != netIF.Port()) {Lock(false); netIF.Port(port); UnLock(); DEBUGR("set data port to " <Truncate(Arg.Path, Size); // Return appropriate result // return (rc ? fsFail(Arg.Ident, "trunc", Arg.Path, rc) : 0); } /******************************************************************************/ /* d o _ T r y */ /******************************************************************************/ // Try requests from a manager indicate that we are being displaced and should // hunt for another manager. The request provides hints as to where to try. // Note that this method is no longer called but handled in XrdCmsProtocol! // const char *XrdCmsNode::do_Try(XrdCmsRRData &Arg) { EPNAME("do_Try") // Do somde debugging // DEBUGR(Arg.Path); // Add all the alternates to our alternate list // if (Manager) Manager->myMans->Add(&netID, Arg.Path, Config.PortTCP, myLevel); // Close the link and return an error // // Disc("redirected."); return ".redirected"; } /******************************************************************************/ /* d o _ U p d a t e */ /******************************************************************************/ const char *XrdCmsNode::do_Update(XrdCmsRRData &Arg) { // Process: update // Respond: status // CmsState.sendState(Link); return 0; } /******************************************************************************/ /* d o _ U s a g e */ /******************************************************************************/ // Usage requests from a manager are local to the cell and never propagated. // const char *XrdCmsNode::do_Usage(XrdCmsRRData &Arg) { // Process: usage // Respond: load // Report_Usage(Link); return 0; } /******************************************************************************/ /* R e p o r t _ U s a g e */ /******************************************************************************/ void XrdCmsNode::Report_Usage(XrdLink *lp) // Static! { EPNAME("Report_Usage") CmsLoadRequest myLoad = {{0, kYR_load, 0, 0}}; struct iovec xmsg[2]; char loadbuff[CmsLoadRequest::numLoad]; char respbuff[sizeof(loadbuff)+2+sizeof(int)+2], *bp = respbuff; int blen, maxfr, pcpu, pnet, pxeq, pmem, ppag, pdsk; // Respond: load // maxfr = Meter.Report(pcpu, pnet, pxeq, pmem, ppag, pdsk); loadbuff[CmsLoadRequest::cpuLoad] = static_cast(pcpu); loadbuff[CmsLoadRequest::netLoad] = static_cast(pnet); loadbuff[CmsLoadRequest::xeqLoad] = static_cast(pxeq); loadbuff[CmsLoadRequest::memLoad] = static_cast(pmem); loadbuff[CmsLoadRequest::pagLoad] = static_cast(ppag); loadbuff[CmsLoadRequest::dskLoad] = static_cast(pdsk); blen = XrdOucPup::Pack(&bp, loadbuff, sizeof(loadbuff)); blen += XrdOucPup::Pack(&bp, maxfr); myLoad.Hdr.datalen = htons(static_cast(blen)); xmsg[0].iov_base = (char *)&myLoad; xmsg[0].iov_len = sizeof(myLoad); xmsg[1].iov_base = respbuff; xmsg[1].iov_len = blen; if (lp) lp->Send(xmsg, 2); else XrdCmsManager::Inform("usage", xmsg, 2); // Do some debugging // DEBUG("cpu=" <