/******************************************************************************/ /* */ /* X r d C m s F i n d e r . c c */ /* */ /* (c) 2007 by the Board of Trustees of the Leland Stanford, Jr., University */ /* All Rights Reserved */ /* Produced by Andrew Hanushevsky for Stanford University under contract */ /* DE-AC02-76-SFO0515 with the Department of Energy */ /* */ /* This file is part of the XRootD software suite. */ /* */ /* XRootD is free software: you can redistribute it and/or modify it under */ /* the terms of the GNU Lesser General Public License as published by the */ /* Free Software Foundation, either version 3 of the License, or (at your */ /* option) any later version. */ /* */ /* XRootD is distributed in the hope that it will be useful, but WITHOUT */ /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */ /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */ /* License for more details. */ /* */ /* You should have received a copy of the GNU Lesser General Public License */ /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */ /* COPYING (GPL license). If not, see . */ /* */ /* The copyright holder's institutional names and contributor's names may not */ /* be used to endorse or promote products derived from this software without */ /* specific prior written permission of the institution or contributor. */ /******************************************************************************/ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "XrdVersion.hh" #include "XProtocol/YProtocol.hh" #include "XrdCms/XrdCmsClientConfig.hh" #include "XrdCms/XrdCmsClientMan.hh" #include "XrdCms/XrdCmsClientMsg.hh" #include "XrdCms/XrdCmsFinder.hh" #include "XrdCms/XrdCmsParser.hh" #include "XrdCms/XrdCmsResp.hh" #include "XrdCms/XrdCmsRRData.hh" #include "XrdCms/XrdCmsSecurity.hh" #include "XrdCms/XrdCmsTrace.hh" #include "XrdOss/XrdOss.hh" #include "XrdOuc/XrdOucBuffer.hh" #include "XrdOuc/XrdOucEnv.hh" #include "XrdOuc/XrdOucErrInfo.hh" #include "XrdOuc/XrdOucReqID.hh" #include "XrdOuc/XrdOucStream.hh" #include "XrdOuc/XrdOucUtils.hh" #include "XrdNet/XrdNetOpts.hh" #include "XrdNet/XrdNetSocket.hh" #include "XrdSfs/XrdSfsInterface.hh" #include "XrdSys/XrdSysError.hh" #include "XrdSys/XrdSysTimer.hh" #include "XrdSys/XrdSysPlatform.hh" #include "XrdSys/XrdSysPlugin.hh" using namespace XrdCms; class XrdInet; /******************************************************************************/ /* G l o b a l s */ /******************************************************************************/ namespace XrdCms { XrdSysError Say(0, "cms_"); XrdOucTrace Trace(&Say); XrdVERSIONINFODEF(myVersion,cmsclient,XrdVNUMBER,XrdVERSION); }; /******************************************************************************/ /* R e m o t e F i n d e r */ /******************************************************************************/ /******************************************************************************/ /* C o n s t r u c t o r */ /******************************************************************************/ XrdCmsFinderRMT::XrdCmsFinderRMT(XrdSysLogger *lp, int whoami, int Port) : XrdCmsClient(XrdCmsClient::amRemote) { myManagers = 0; myManCount = 0; myManList = 0; myPort = Port; SMode = 0; sendID = 0; isMeta = whoami & IsMeta; isProxy = whoami & IsProxy; isTarget = whoami & IsTarget; savePath = 0; Say.logger(lp); } /******************************************************************************/ /* D e s t r u c t o r */ /******************************************************************************/ XrdCmsFinderRMT::~XrdCmsFinderRMT() { XrdCmsClientMan *mp, *nmp = myManagers; XrdOucTList *tp, *tpp = myManList; while((mp = nmp)) {nmp = mp->nextManager(); delete mp;} while((tp = tpp)) {tpp = tp->next; delete tp;} } /******************************************************************************/ /* C o n f i g u r e */ /******************************************************************************/ int XrdCmsFinderRMT::Configure(const char *cfn, char *Args, XrdOucEnv *envP) { XrdCmsClientConfig config; XrdCmsClientConfig::configHow How; XrdCmsClientConfig::configWhat What; XrdInet *netP; int Topts = IsRedir; // Establish what we will be configuring // if (isProxy) {How = XrdCmsClientConfig::configProxy; Topts |= IsProxy;} else if (isMeta) How = XrdCmsClientConfig::configMeta; else How = XrdCmsClientConfig::configNorm; What = (isTarget ? XrdCmsClientConfig::configSuper : XrdCmsClientConfig::configMan); // Establish the network interface that the caller must provide // if (!envP || !(netP = (XrdInet *)envP->GetPtr("XrdInet*"))) {Say.Emsg("Finder", "Network not defined; unable to connect to cmsd."); return 0; } XrdCmsClientMan::setNetwork(netP); XrdCmsClientMan::setConfig(cfn); XrdCmsSecurity::setSecFunc(envP->GetPtr("XrdSecGetProtocol*")); // Now call the configration object // if (config.Configure(cfn, What, How)) return 0; // Set configured values and start the managers // CMSPath = config.CMSPath; RepDelay = config.RepDelay; RepNone = config.RepNone; RepWait = config.RepWait; ConWait = config.ConWait; FwdWait = config.FwdWait; PrepWait = config.PrepWait; if (isProxy) {SMode = config.SModeP; StartManagers(config.PanList); config.PanList = 0; } else {SMode = config.SMode; StartManagers(config.ManList); config.ManList = 0; } // If we are tracing or if redirect monitoring is enabled, we will need // to save path information. // if (QTRACE(Redirect) || getenv("XRDMONRDR")) savePath = 1; // If we are a plain manager but have a meta manager then we must start // a responder (that we will hide) to pass through the port number. // if (!isMeta && !isTarget && config.haveMeta) {XrdCmsFinderTRG *Rsp = new XrdCmsFinderTRG(Say.logger(),Topts,myPort); return Rsp->RunAdmin(CMSPath, config.myVNID); } // All done // return 1; } /******************************************************************************/ /* F o r w a r d */ /******************************************************************************/ int XrdCmsFinderRMT::Forward(XrdOucErrInfo &Resp, const char *cmd, const char *arg1, const char *arg2, XrdOucEnv *Env1, XrdOucEnv *Env2) { static XrdSysMutex fwdMutex; static struct timeval fwdClk = {time(0),0}; static const int xNum = 12; XrdCmsClientMan *Manp; XrdCmsRRData Data; unsigned int iMan; int iovcnt, is2way, doAll = 0, opQ1Len = 0, opQ2Len = 0; char Work[xNum*12]; struct iovec xmsg[xNum]; // Encode the request as a redirector command // if ((is2way = (*cmd == '+'))) cmd++; if (!strcmp("chmod", cmd)) Data.Request.rrCode = kYR_chmod; else if (!strcmp("mkdir", cmd)) Data.Request.rrCode = kYR_mkdir; else if (!strcmp("mkpath",cmd)) Data.Request.rrCode = kYR_mkpath; else if (!strcmp("mv", cmd)){Data.Request.rrCode = kYR_mv; doAll=1;} else if (!strcmp("rm", cmd)){Data.Request.rrCode = kYR_rm; doAll=1;} else if (!strcmp("rmdir", cmd)){Data.Request.rrCode = kYR_rmdir; doAll=1;} else if (!strcmp("trunc", cmd)) Data.Request.rrCode = kYR_trunc; else {Say.Emsg("Finder", "Unable to forward '", cmd, "'."); Resp.setErrInfo(EINVAL, "Internal error processing file."); return SFS_ERROR; } // Fill out the RR data structure // Data.Ident = (char *)(XrdCmsClientMan::doDebug ? Resp.getErrUser() : ""); Data.Path = (char *)arg1; Data.Mode = (char *)arg2; Data.Path2 = (char *)arg2; Data.Opaque = (Env1 ? Env1->Env(opQ1Len) : 0); Data.Opaque2 = (Env2 ? Env2->Env(opQ2Len) : 0); // Pack the arguments // if (!(iovcnt = Parser.Pack(int(Data.Request.rrCode), &xmsg[1], &xmsg[xNum], (char *)&Data, Work))) {Resp.setErrInfo(EINVAL, "Internal error processing file."); return SFS_ERROR; } // Insert the header into the stream // Data.Request.streamid = 0; Data.Request.modifier = 0; xmsg[0].iov_base = (char *)&Data.Request; xmsg[0].iov_len = sizeof(Data.Request); // This may be a 2way message. If so, use the longer path. // if (is2way) return send2Man(Resp, (arg1 ? arg1 : "/"), xmsg, iovcnt+1); // Check if we have exceeded the maximum rate for requests. If we have, we // will wait to repace the stream so as to not burden the cmsd. // if (doAll && FwdWait) {struct timeval nowClk; time_t Window; fwdMutex.Lock(); gettimeofday(&nowClk, 0); fwdClk.tv_sec = nowClk.tv_sec - fwdClk.tv_sec; fwdClk.tv_usec = nowClk.tv_usec - fwdClk.tv_usec; if (fwdClk.tv_usec < 0) {fwdClk.tv_sec--; fwdClk.tv_usec += 1000000;} Window = fwdClk.tv_sec*1000 + fwdClk.tv_usec/1000; if (Window < FwdWait) XrdSysTimer::Wait(FwdWait - Window); fwdClk = nowClk; fwdMutex.UnLock(); } // Select the right manager for this request // if (!(Manp = SelectManager(Resp, (arg1 ? arg1 : "/")))) return ConWait; // Send message and simply wait for the reply // if (Manp->Send(iMan, xmsg, iovcnt+1)) {if (doAll) {Data.Request.modifier |= kYR_dnf; Inform(Manp, xmsg, iovcnt+1); } return 0; } // Indicate client should retry later // Resp.setErrInfo(RepDelay, ""); return RepDelay; } /******************************************************************************/ /* I n f o r m */ /******************************************************************************/ void XrdCmsFinderRMT::Inform(XrdCmsClientMan *xman, struct iovec xmsg[], int xnum) { XrdCmsClientMan *Womp, *Manp; unsigned int iMan; // Make sure we are configured // if (!myManagers) {Say.Emsg("Finder", "SelectManager() called prior to Configure()."); return; } // Start at the beginning (we will avoid the previously selected one) // Womp = Manp = myManagers; do {if (Manp != xman && Manp->isActive()) Manp->Send(iMan, xmsg, xnum); } while((Manp = Manp->nextManager()) != Womp); } /******************************************************************************/ /* L o c a t e */ /******************************************************************************/ int XrdCmsFinderRMT::Locate(XrdOucErrInfo &Resp, const char *path, int flags, XrdOucEnv *Env) { static const int xNum = 12; XrdCmsRRData Data; int n, iovcnt; char Work[xNum*12]; struct iovec xmsg[xNum]; char *triedRC, *affmode; // Fill out the RR data structure // Data.Ident = (char *)(XrdCmsClientMan::doDebug ? Resp.getErrUser() : ""); Data.Path = (char *)path; Data.Opaque = (Env ? Env->Env(n) : 0); Data.Avoid = (Env ? Env->Get("tried") : 0); // Set options and command // if (flags & SFS_O_LOCATE) {bool doAll = (flags & SFS_O_FORCE) != 0; if (flags & SFS_O_LOCAL) return LocLocal(Resp, Env); Data.Request.rrCode = kYR_locate; Data.Opts = (flags & SFS_O_NOWAIT ? CmsLocateRequest::kYR_asap : 0) | (flags & SFS_O_RESET ? CmsLocateRequest::kYR_refresh : 0); if (Resp.getUCap() & XrdOucEI::uPrip) Data.Opts |= CmsLocateRequest::kYR_prvtnet; if (Resp.getUCap() & XrdOucEI::uIPv4) {Data.Opts |= (Resp.getUCap() & XrdOucEI::uIPv64 || doAll ? CmsLocateRequest::kYR_retipv46 : 0); } else { Data.Opts |= (Resp.getUCap() & XrdOucEI::uIPv64 || doAll ? CmsLocateRequest::kYR_retipv64 : CmsLocateRequest::kYR_retipv6); } if (flags & SFS_O_HNAME) Data.Opts |= CmsLocateRequest::kYR_retname; if (flags & SFS_O_RAWIO) Data.Opts |= CmsLocateRequest::kYR_retuniq; if (doAll) Data.Opts |= CmsLocateRequest::kYR_listall; } else { Data.Request.rrCode = kYR_select; if (flags & SFS_O_TRUNC) Data.Opts = CmsSelectRequest::kYR_trunc; else if (flags & SFS_O_CREAT) { Data.Opts = CmsSelectRequest::kYR_create; if (flags & SFS_O_REPLICA) Data.Opts|= CmsSelectRequest::kYR_replica; } else if (flags & SFS_O_STAT) Data.Opts = CmsSelectRequest::kYR_stat; else Data.Opts = 0; Data.Opts |= (flags & (SFS_O_WRONLY | SFS_O_RDWR) ? CmsSelectRequest::kYR_write : CmsSelectRequest::kYR_read); if (flags & SFS_O_META) Data.Opts |= CmsSelectRequest::kYR_metaop; if (flags & SFS_O_NOWAIT) Data.Opts |= CmsSelectRequest::kYR_online; if (flags & SFS_O_RESET) Data.Opts |= CmsSelectRequest::kYR_refresh; if (flags & SFS_O_MULTIW) Data.Opts |= CmsSelectRequest::kYR_mwfiles; if (Env && (affmode = Env->Get("cms.aff"))) { if (*affmode == 'n') Data.Opts |= CmsSelectRequest::kYR_aNone; else if (*affmode == 'S') Data.Opts |= CmsSelectRequest::kYR_aStrict; else if (*affmode == 's') Data.Opts |= CmsSelectRequest::kYR_aStrong; else if (*affmode == 'w') Data.Opts |= CmsSelectRequest::kYR_aWeak; } if (Resp.getUCap() & XrdOucEI::uPrip) Data.Opts |= CmsSelectRequest::kYR_prvtnet; if (Resp.getUCap() & XrdOucEI::uIPv4) {Data.Opts |= (Resp.getUCap() & XrdOucEI::uIPv64 ? CmsSelectRequest::kYR_retipv46 : 0); } else { Data.Opts |= (Resp.getUCap() & XrdOucEI::uIPv64 ? CmsSelectRequest::kYR_retipv64 : CmsSelectRequest::kYR_retipv6); } if (Data.Avoid && Env && (triedRC = Env->Get("triedrc"))) {char *comma = rindex(triedRC, ','); if (comma) triedRC = comma+1; if (!strcmp(triedRC, "enoent")) Data.Opts |= CmsSelectRequest::kYR_tryMISS; else if (!strcmp(triedRC, "ioerr")) Data.Opts |= CmsSelectRequest::kYR_tryIOER; else if (!strcmp(triedRC, "fserr")) Data.Opts |= CmsSelectRequest::kYR_tryFSER; else if (!strcmp(triedRC, "srverr")) Data.Opts |= CmsSelectRequest::kYR_trySVER; else if (!strcmp(triedRC, "resel")) Data.Opts |= CmsSelectRequest::kYR_tryRSEL; } } // Pack the arguments // if (!(iovcnt = Parser.Pack(int(Data.Request.rrCode), &xmsg[1], &xmsg[xNum], (char *)&Data, Work))) {Resp.setErrInfo(EINVAL, "Internal error processing file."); return SFS_ERROR; } // Insert the header into the stream // Data.Request.streamid = 0; Data.Request.modifier = 0; xmsg[0].iov_base = (char *)&Data.Request; xmsg[0].iov_len = sizeof(Data.Request); // Send the 2way message // return send2Man(Resp, path, xmsg, iovcnt+1); } /******************************************************************************/ /* L o c L o c a l */ /******************************************************************************/ int XrdCmsFinderRMT::LocLocal(XrdOucErrInfo &Resp, XrdOucEnv *Env) { XrdCmsClientMan *Womp, *Manp; XrdOucBuffer *xBuff = 0; char *mBeg, *mBuff, mStat; int mBlen, n; // If we have no managers or no role, we are not clustered // if (!myManagers) {Resp.setErrInfo(0, ""); return SFS_DATA; } // Get where to start and where to put the information // Womp = Manp = myManagers; mBeg = mBuff = Resp.getMsgBuff(mBlen); // Check if we can use the internal buffer or need to get an external buffer // n = 8 + (myManCount * (256+6+2)); if (n > mBlen) {mBeg = mBuff = (char *)malloc(n); if (!mBeg) {Resp.setErrInfo(ENOMEM, "Insufficient memory."); return SFS_ERROR; } xBuff = new XrdOucBuffer(mBeg, n); mBlen = n; } // Make sure we have enough space to continue // if (mBlen < 1024) {Resp.setErrInfo(EINVAL, "Invalid role."); return SFS_ERROR; } // List the status of each manager // do {if (Manp->isActive()) mStat = (Manp->Suspended() ? 's' : 'c'); else mStat = 'd'; n = snprintf(mBuff, mBlen, "%s:%d/%c ", Manp->Name(), Manp->manPort(), mStat); mBuff += n; mBlen -= n; } while((Manp = Manp->nextManager()) != Womp && mBlen > 0); // We should not have overrun the buffer; if we did declare failure // if (mBlen < 0) {Resp.setErrInfo(EINVAL, "Internal processing error."); if (xBuff) xBuff->Recycle(); return SFS_ERROR; } // Set the final result // n = mBuff - mBeg; if (!xBuff) Resp.setErrCode(n); else {xBuff->SetLen(n); Resp.setErrInfo(n, xBuff); } // All done // return SFS_DATA; } /******************************************************************************/ /* P r e p a r e */ /******************************************************************************/ int XrdCmsFinderRMT::Prepare(XrdOucErrInfo &Resp, XrdSfsPrep &pargs, XrdOucEnv *envP) { EPNAME("Prepare") static const int xNum = 16; static XrdSysMutex prepMutex; XrdCmsRRData Data; XrdOucTList *tp, *op; XrdCmsClientMan *Manp = 0; unsigned int iMan; int iovcnt = 0, NoteLen, n; char Prty[1032], *NoteNum = 0, *colocp = 0; char Work[xNum*12]; struct iovec xmsg[xNum]; // Prefill the RR data structure and iovec // Data.Ident = (char *)(XrdCmsClientMan::doDebug ? Resp.getErrUser() : ""); Data.Reqid = pargs.reqid; Data.Request.streamid = 0; Data.Request.modifier = 0; xmsg[0].iov_base = (char *)&Data.Request; xmsg[0].iov_len = sizeof(Data.Request); // Check for a cancel request // if (!(tp = pargs.paths)) {Data.Request.rrCode = kYR_prepdel; if (!(iovcnt = Parser.Pack(kYR_prepdel, &xmsg[1], &xmsg[xNum], (char *)&Data, Work))) {Resp.setErrInfo(EINVAL, "Internal error processing file."); return SFS_ERROR; } if (!(Manp = SelectManager(Resp, 0))) return ConWait; if (Manp->Send(iMan, (const struct iovec *)&xmsg, iovcnt+1)) return 0; DEBUG("Finder: Failed to send prepare cancel to " <Name() <<" reqid=" <next) {colocp = Prty + n; strlcpy(colocp+1, pargs.paths->text, sizeof(Prty)-n-1); } Data.Prty = Prty; // Distribute out paths to the various managers // Data.Request.rrCode = kYR_prepadd; op = pargs.oinfo; while(tp) {if (NoteNum) sprintf(NoteNum, "%d", tp->val); Data.Path = tp->text; if (op) {Data.Opaque = op->text; op = op->next;} else Data.Opaque = 0; if (!(iovcnt = Parser.Pack(kYR_prepadd, &xmsg[1], &xmsg[xNum], (char *)&Data, Work))) break; if (!(Manp = SelectManager(Resp, tp->text))) break; DEBUG("Finder: Sending " <Name() <<' ' <Send(iMan, (const struct iovec *)&xmsg, iovcnt+1)) break; if ((tp = tp->next)) {prepMutex.Lock(); XrdSysTimer::Wait(PrepWait); prepMutex.UnLock();} if (colocp) {Data.Request.modifier |= CmsPrepAddRequest::kYR_coloc; *colocp = ' '; colocp = 0; } } // Check if all went well // if (NoteNum) free(Data.Notify); if (!tp) return 0; // Decode the error condition // if (!Manp) return ConWait; if (!iovcnt) {Say.Emsg("Finder", "Unable to send prepadd; too much data."); Resp.setErrInfo(EINVAL, "Internal error processing file."); return SFS_ERROR; } Resp.setErrInfo(RepDelay, ""); DEBUG("Finder: Failed to send prepare to " <<(Manp ? Manp->Name() : "?") <<" reqid=" <isActive()) return (Manp->Suspended() ? 0 : Manp); } while((Manp = Manp->nextManager()) != Womp); // All managers are dead // SelectManFail(Resp); return (XrdCmsClientMan *)0; } /******************************************************************************/ /* S e l e c t M a n F a i l */ /******************************************************************************/ void XrdCmsFinderRMT::SelectManFail(XrdOucErrInfo &Resp) { EPNAME("SelectManFail") static time_t nextMsg = 0; time_t now; // All servers are dead, indicate so every minute // now = time(0); myData.Lock(); if (nextMsg < now) {nextMsg = now + 60; myData.UnLock(); Say.Emsg("Finder", "All managers are dysfunctional."); } else myData.UnLock(); Resp.setErrInfo(ConWait, ""); TRACE(Redirect, "user=" <Suspended()) return ConWait; // Allocate a message object. There is only a fixed number of these and if // all of them are in use, th client has to wait to prevent over-runs. // if (!(mp = XrdCmsClientMsg::Alloc(&Resp))) {Resp.setErrInfo(RepDelay, ""); TRACE(Redirect, Resp.getErrUser() <<" no more msg objects; path=" <streamid = mp->ID(); if (savePath) Resp.setErrData(path); else Resp.setErrData(0); // Send message and simply wait for the reply (msg object is locked via Alloc) // if (!Manp->Send(iMan, xmsg, xnum) || (mp->Wait4Reply(Manp->waitTime()))) {mp->Recycle(); retc = Manp->whatsUp(Resp.getErrUser(), path, iMan); Resp.setErrInfo(retc, ""); return retc; } // A reply was received; process as appropriate // retc = mp->getResult(); if (retc == SFS_STARTED) retc = Manp->delayResp(Resp); else if (retc == SFS_STALL) retc = Resp.getErrInfo(); // All done // mp->Recycle(); return retc; } /******************************************************************************/ /* S t a r t M a n a g e r s */ /******************************************************************************/ void *XrdCmsStartManager(void *carg) {XrdCmsClientMan *mp = (XrdCmsClientMan *)carg; return mp->Start(); } void *XrdCmsStartResp(void *carg) {XrdCmsResp::Reply(); return (void *)0; } int XrdCmsFinderRMT::StartManagers(XrdOucTList *theManList) { XrdOucTList *tp; XrdCmsClientMan *mp, *firstone = 0; int i = 0; pthread_t tid; char buff[128]; // Save the proper manager list for later reporting // myManList = theManList; // Clear manager table // memset((void *)myManTable, 0, sizeof(myManTable)); // For each manager, start a thread to handle it // tp = theManList; while(tp && i < MaxMan) {mp = new XrdCmsClientMan(tp->text,tp->val,ConWait,RepNone,RepWait,RepDelay); myManTable[i] = mp; if (myManagers) mp->setNext(myManagers); else firstone = mp; myManagers = mp; if (XrdSysThread::Run(&tid,XrdCmsStartManager,(void *)mp,0,mp->Name())) Say.Emsg("Finder", errno, "start manager"); tp = tp->next; i++; } // Check if we exceeded maximum manager count // if (tp) while(tp) {Say.Emsg("Config warning: too many managers;",tp->text,"ignored."); tp = tp->next; } // Make this a circular chain // if (firstone) firstone->setNext(myManagers); // Indicate how many managers have been started // sprintf(buff, "%d manager(s) started.", i); Say.Say("Config ", buff); myManCount = i; // Now Start that many callback threads // while(i--) if (XrdSysThread::Run(&tid,XrdCmsStartResp,(void *)0,0,"async callback")) Say.Emsg("Finder", errno, "start callback manager"); // All done // return 0; } /******************************************************************************/ /* S p a c e */ /******************************************************************************/ int XrdCmsFinderRMT::Space(XrdOucErrInfo &Resp, const char *path, XrdOucEnv *eP) { static const int xNum = 4; XrdCmsRRData Data; int iovcnt; char Work[xNum*12]; struct iovec xmsg[xNum]; // Fill out the RR data structure // Data.Ident = (char *)(XrdCmsClientMan::doDebug ? Resp.getErrUser() : ""); Data.Path = (char *)path; // Pack the arguments // if (!(iovcnt = Parser.Pack(kYR_statfs, &xmsg[1], &xmsg[xNum], (char *)&Data, Work))) {Resp.setErrInfo(EINVAL, "Internal error processing file."); return SFS_ERROR; } // Insert the header into the stream // Data.Request.rrCode = kYR_statfs; Data.Request.streamid = 0; Data.Request.modifier = (eP && eP->Get("cms.qvfs") ? CmsStatfsRequest::kYR_qvfs : 0); xmsg[0].iov_base = (char *)&Data.Request; xmsg[0].iov_len = sizeof(Data.Request); // Send the 2way message // return send2Man(Resp, path, xmsg, iovcnt+1); } /******************************************************************************/ /* V C h e c k */ /******************************************************************************/ bool XrdCmsFinderRMT::VCheck(XrdVersionInfo &urVersion) { return XrdSysPlugin::VerCmp(urVersion, myVersion); } /******************************************************************************/ /* T a r g e t F i n d e r */ /******************************************************************************/ /******************************************************************************/ /* C o n s t r u c t o r */ /******************************************************************************/ XrdCmsFinderTRG::XrdCmsFinderTRG(XrdSysLogger *lp, int whoami, int port, XrdOss *theSS) : XrdCmsClient(XrdCmsClient::amTarget) { isRedir = whoami & IsRedir; isProxy = whoami & IsProxy; SS = theSS; CMSPath = 0; Login = 0; myManList = 0; CMSp = new XrdOucStream(&Say); Active = 0; myPort = port; resMax = -1; resCur = 0; Say.logger(lp); } /******************************************************************************/ /* D e s t r u c t o r */ /******************************************************************************/ XrdCmsFinderTRG::~XrdCmsFinderTRG() { XrdOucTList *tp, *tpp = myManList; if (CMSp) delete CMSp; if (Login) free(Login); while((tp = tpp)) {tpp = tp->next; delete tp;} } /******************************************************************************/ /* A d d e d */ /******************************************************************************/ void XrdCmsFinderTRG::Added(const char *path, int Pend) { char *data[4]; int dlen[4]; // Set up to notify the cluster that a file has been added // data[0] = (char *)"newfn "; dlen[0] = 6; data[1] = (char *)path; dlen[1] = strlen(path); if (Pend) {data[2] = (char *)" p\n"; dlen[2] = 3;} else {data[2] = (char *)"\n"; dlen[2] = 1;} data[3] = 0; dlen[3] = 0; // Now send the notification // myData.Lock(); if (Active && CMSp->Put((const char **)data, (const int *)dlen)) {CMSp->Close(); Active = 0;} myData.UnLock(); } /******************************************************************************/ /* C o n f i g u r e */ /******************************************************************************/ void *XrdCmsStartRsp(void *carg) {XrdCmsFinderTRG *mp = (XrdCmsFinderTRG *)carg; return mp->Start(); } int XrdCmsFinderTRG::Configure(const char *cfn, char *Ags, XrdOucEnv *envP) { XrdCmsClientConfig config; XrdCmsClientConfig::configWhat What; // Establish what we will be configuring // What = (isRedir ? XrdCmsClientConfig::configSuper : XrdCmsClientConfig::configServer); // Steal the manlist as we might have to report it // if (isProxy) {myManList = config.PanList; config.PanList = 0;} else {myManList = config.ManList; config.ManList = 0;} // Set the error dest and simply call the configration object and if // if (config.Configure(cfn, What, XrdCmsClientConfig::configNorm)) return 0; // Run the Admin thread. Note that unlike FinderRMT, we do not extract the // security function pointer or the network object pointer from the // environment as we don't need these at all. // return RunAdmin(config.CMSPath, config.myVNID); } /******************************************************************************/ /* L o c a t e */ /******************************************************************************/ int XrdCmsFinderTRG::Locate(XrdOucErrInfo &Resp, const char *path, int flags, XrdOucEnv *Env) { char *mBuff; int mBlen, n; // We only support locate on the local configuration // if (!(flags & SFS_O_LOCATE) || !(flags & SFS_O_LOCAL)) {Resp.setErrInfo(EINVAL, "Invalid locate option for target config."); return SFS_ERROR; } // Get the buffer for the result // mBuff = Resp.getMsgBuff(mBlen); // Return information // n = snprintf(mBuff, mBlen, "localhost:0/%c", (Active ? 'a' : 'd')); Resp.setErrCode(n); return SFS_DATA; } /******************************************************************************/ /* R e l e a s e */ /******************************************************************************/ int XrdCmsFinderTRG::Release(int rNum) { int resOld; // Lock the variables of interest // rrMutex.Lock(); resOld = resCur; // If reserve/release not enabled or we have a non-positive value, return // if (resMax < 0 || rNum <= 0) {rrMutex.UnLock(); return resOld;} // Adjust resource and check if we can resume // resCur += rNum; if (resCur > resMax) resCur = resMax; if (resOld < 1 && resCur > 0) Resume(0); // All done // resOld = resCur; rrMutex.UnLock(); return resOld; } /******************************************************************************/ /* R e m o v e d */ /******************************************************************************/ void XrdCmsFinderTRG::Removed(const char *path) { char *data[4]; int dlen[4]; // Set up to notify the cluster that a file has been removed // data[0] = (char *)"rmdid "; dlen[0] = 6; data[1] = (char *)path; dlen[1] = strlen(path); data[2] = (char *)"\n"; dlen[2] = 1; data[3] = 0; dlen[3] = 0; // Now send the notification // myData.Lock(); if (Active && CMSp->Put((const char **)data, (const int *)dlen)) {CMSp->Close(); Active = 0;} myData.UnLock(); } /******************************************************************************/ /* R e s e r v e */ /******************************************************************************/ int XrdCmsFinderTRG::Reserve(int rNum) { int resOld; // Lock the variables of interest // rrMutex.Lock(); resOld = resCur; // If reserve/release not enabled or we have a non-positive value, return // if (resMax < 0 || rNum <= 0) {rrMutex.UnLock(); return resOld;} // Adjust resource and check if we can suspend // resCur -= rNum; if (resOld > 0 && resCur < 1) Suspend(0); // All done // resOld = resCur; rrMutex.UnLock(); return resOld; } /******************************************************************************/ /* R e s o u r c e */ /******************************************************************************/ int XrdCmsFinderTRG::Resource(int rNum) { int resOld; // Lock the variables of interest // rrMutex.Lock(); resOld = (resMax < 0 ? 0 : resMax); // If we have a non-positive value, return // if (rNum <= 0) {rrMutex.UnLock(); return resOld;} // Set the resource and adjust the current value as needed // resMax = rNum; if (resCur > resMax) resCur = resMax; // All done // rrMutex.UnLock(); return resOld; } /******************************************************************************/ /* R e s u m e */ /******************************************************************************/ void XrdCmsFinderTRG::Resume(int Perm) { // 1234567890 static const char *rPerm[2] = {"resume\n", 0}; static const char *rTemp[2] = {"resume t\n", 0}; static int lPerm[2] = { 7, 0}; static int lTemp[2] = { 9, 0}; // Now send the notification // myData.Lock(); if (Active && CMSp->Put((const char **)(Perm ? rPerm : rTemp), (const int *) (Perm ? lPerm : lTemp))) {CMSp->Close(); Active = 0;} myData.UnLock(); } /******************************************************************************/ /* S u s p e n d */ /******************************************************************************/ void XrdCmsFinderTRG::Suspend(int Perm) { // 1234567890 static const char *sPerm[2] = {"suspend\n", 0}; static const char *sTemp[2] = {"suspend t\n", 0}; static int lPerm[2] = { 8, 0}; static int lTemp[2] = {10, 0}; // Now send the notification // if (Active && CMSp->Put((const char **)(Perm ? sPerm : sTemp), (const int *) (Perm ? lPerm : lTemp))) {CMSp->Close(); Active = 0;} myData.UnLock(); } /******************************************************************************/ /* R u n A d m i n */ /******************************************************************************/ int XrdCmsFinderTRG::RunAdmin(char *Path, const char *vnid) { const char *lFmt; pthread_t tid; char buff [512]; // Make sure we have a path to the cmsd // if (!(CMSPath = Path)) {Say.Emsg("Config", "Unable to determine cms admin path"); return 0;} // Construct the login line // lFmt = (vnid ? "login %c %d port %d vnid %s\n" : "login %c %d port %d\n"); snprintf(buff, sizeof(buff), lFmt, (isProxy ? 'P' : 'p'), static_cast(getpid()), myPort, vnid); Login = strdup(buff); // Start a thread to connect with the local cmsd // if (XrdSysThread::Run(&tid, XrdCmsStartRsp, (void *)this, 0, "cms i/f")) {Say.Emsg("Config", errno, "start cmsd interface"); return 0;} return 1; } /******************************************************************************/ /* S t a r t */ /******************************************************************************/ void *XrdCmsFinderTRG::Start() { XrdCmsRRData Data; // First step is to connect to the local cmsd. We also establish a binary // read stream (old olbd's never used it) to get requests that can only be // executed by the xrootd (e.g., rm and mv). // while(1) {do {Hookup(); // Login to cmsd // myData.Lock(); CMSp->Put(Login); myData.UnLock(); // Get the FD for this connection // Data.Routing = CMSp->FDNum(); // Put up a read to process local requests. Sould the cmsd die, // we will notice and try to reconnect. // while(recv(Data.Routing, &Data.Request, sizeof(Data.Request), MSG_WAITALL) > 0 && Process(Data)) {} break; } while(1); // The cmsd went away // myData.Lock(); CMSp->Close(); Active = 0; myData.UnLock(); Say.Emsg("Finder", "Lost contact with cmsd via", CMSPath); XrdSysTimer::Wait(10*1000); } // We should never get here // return (void *)0; } /******************************************************************************/ /* V C h e c k */ /******************************************************************************/ bool XrdCmsFinderTRG::VCheck(XrdVersionInfo &urVersion) { return XrdSysPlugin::VerCmp(urVersion, myVersion); } /******************************************************************************/ /* P r i v a t e M e t h o d s */ /******************************************************************************/ /******************************************************************************/ /* H o o k u p */ /******************************************************************************/ void XrdCmsFinderTRG::Hookup() { struct stat buf; XrdNetSocket Sock(&Say); int opts = 0, tries = 6; // Wait for the cmsd path to be created // while(stat(CMSPath, &buf)) {if (!tries--) {Say.Emsg("Finder", "Waiting for cms path", CMSPath); tries=6;} XrdSysTimer::Wait(10*1000); } // We can now try to connect // tries = 0; while(Sock.Open(CMSPath, -1, opts) < 0) {if (!tries--) {opts = XRDNET_NOEMSG; tries = 6; } else if (!tries) opts = 0; XrdSysTimer::Wait(10*1000); }; // Transfer the socket FD to a stream // myData.Lock(); Active = 1; CMSp->Attach(Sock.Detach()); myData.UnLock(); // Tell the world // Say.Emsg("Finder", "Connected to cmsd via", CMSPath); } /******************************************************************************/ /* P r o c e s s */ /******************************************************************************/ int XrdCmsFinderTRG::Process(XrdCmsRRData &Data) { EPNAME("Process") static const int maxReqSize = 16384; static int Wmsg = 255; const char *myArgs, *myArgt, *Act; char buff[16]; int rc; // Decode the length and get the rest of the data // Data.Dlen = static_cast(ntohs(Data.Request.datalen)); if (!(Data.Dlen)) {myArgs = myArgt = 0;} else {if (Data.Dlen > maxReqSize) {Say.Emsg("Finder","Request args too long from local cmsd"); return 0; } if ((!Data.Buff || Data.Blen < Data.Dlen) && !Data.getBuff(Data.Dlen)) {Say.Emsg("Finder", "No buffers to serve local cmsd"); return 0; } if (recv(Data.Routing,Data.Buff,Data.Dlen,MSG_WAITALL) != Data.Dlen) return 0; myArgs = Data.Buff; myArgt = Data.Buff + Data.Dlen; } // Process the request as needed. We ignore opaque information for now. // If the request is not valid is could be that we lost sync on the connection. // The only way to recover is to tear it down and start over. // switch(Data.Request.rrCode) {case kYR_mv: Act = "mv"; break; case kYR_rm: Act = "rm"; Data.Path2 = (char *)""; break; case kYR_rmdir: Act = "rmdir"; Data.Path2 = (char *)""; break; default: sprintf(buff, "%d", Data.Request.rrCode); Say.Emsg("Finder","Local cmsd sent an invalid request -",buff); return 0; } // Parse the arguments // if (!myArgs || !Parser.Parse(int(Data.Request.rrCode),myArgs,myArgt,&Data)) {Say.Emsg("Finder", "Local cmsd sent a badly formed",Act,"request"); return 1; } DEBUG("cmsd requested " <Rename(Data.Path, Data.Path2); break; case kYR_rm: rc = SS->Unlink(Data.Path); break; case kYR_rmdir: rc = SS->Remdir(Data.Path); break; default: rc = 0; break; } if (rc) Say.Emsg("Finder", rc, Act, Data.Path); // All Done // return 1; }