/******************************************************************************/ /* */ /* X r d C m s C l i e n t M a n . c c */ /* */ /* (c) 2007 by the Board of Trustees of the Leland Stanford, Jr., University */ /* All Rights Reserved */ /* Produced by Andrew Hanushevsky for Stanford University under contract */ /* DE-AC02-76-SFO0515 with the Department of Energy */ /* */ /* This file is part of the XRootD software suite. */ /* */ /* XRootD is free software: you can redistribute it and/or modify it under */ /* the terms of the GNU Lesser General Public License as published by the */ /* Free Software Foundation, either version 3 of the License, or (at your */ /* option) any later version. */ /* */ /* XRootD is distributed in the hope that it will be useful, but WITHOUT */ /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */ /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */ /* License for more details. */ /* */ /* You should have received a copy of the GNU Lesser General Public License */ /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */ /* COPYING (GPL license). If not, see . */ /* */ /* The copyright holder's institutional names and contributor's names may not */ /* be used to endorse or promote products derived from this software without */ /* specific prior written permission of the institution or contributor. */ /******************************************************************************/ #include #include "XrdCms/XrdCmsClientMan.hh" #include "XrdCms/XrdCmsClientMsg.hh" #include "XrdCms/XrdCmsLogin.hh" #include "XrdCms/XrdCmsTrace.hh" #include "XrdSfs/XrdSfsInterface.hh" #include "XrdSys/XrdSysError.hh" #include "XrdSys/XrdSysTimer.hh" #include "Xrd/XrdInet.hh" #include "Xrd/XrdLink.hh" using namespace XrdCms; /******************************************************************************/ /* G l o b a l s */ /******************************************************************************/ XrdOucBuffPool XrdCmsClientMan::BuffPool(XrdOucEI::Max_Error_Len, 65536, 1, 16); XrdInet *XrdCmsClientMan::Network = 0; char XrdCmsClientMan::doDebug = 0; const char *XrdCmsClientMan::ConfigFN = 0; XrdSysMutex XrdCmsClientMan::manMutex; /******************************************************************************/ /* C o n s t r u c t o r */ /******************************************************************************/ XrdCmsClientMan::XrdCmsClientMan(char *host, int port, int cw, int nr, int rw, int rd) : syncResp(0) { static XrdSysMutex initMutex; static int Instance = 0; char *dot; Host = strdup(host); if ((dot = index(Host, '.'))) {*dot = '\0'; HPfx = strdup(Host); *dot = '.';} else HPfx = strdup(Host); Port = port; Link = 0; Active = 0; Silent = 0; Suspend = 1; RecvCnt = 0; nrMax = nr; NetBuff = BuffPool.Alloc(XrdOucEI::Max_Error_Len); repWMax = rw; repWait = 0; minDelay= rd; maxDelay= rd*3; chkCount= chkVal; lastUpdt= lastTOut = time(0); Next = 0; manInst = 1; // Compute dally value // dally = cw / 2 - 1; if (dally < 3) dally = 3; else if (dally > 10) dally = 10; // Provide a unique mask number for this manager // initMutex.Lock(); manMask = 1<Close(); if (Host) free(Host); if (HPfx) free(HPfx); if (NetBuff) NetBuff->Recycle(); } /******************************************************************************/ /* d e l a y R e s p */ /******************************************************************************/ int XrdCmsClientMan::delayResp(XrdOucErrInfo &Resp) { XrdCmsResp *rp; int msgid; // Obtain the message ID // if (!(msgid = Resp.getErrInfo())) {Say.Emsg("Manager", Host, "supplied invalid waitr msgid"); Resp.setErrInfo(EILSEQ, "redirector protocol error"); syncResp.Post(); return SFS_ERROR; } // Allocate a delayed response object // if (!(rp = XrdCmsResp::Alloc(&Resp, msgid))) {Say.Emsg("Manager",ENOMEM,"allocate resp object for",Resp.getErrUser()); Resp.setErrInfo(0, "0"); syncResp.Post(); return SFS_STALL; } // Add this object to our delayed response queue. If the manager bounced then // purge all of the pending repsonses to avoid sending wrong ones. // if (msgid < maxMsgID) RespQ.Purge(); maxMsgID = msgid; RespQ.Add(rp); // Tell client to wait for response. The semaphore post allows the manager // to get the next message from the cmsd. This prevents us from getting the // delayed response before the response object is added to the queue. // Resp.setErrInfo(0, ""); syncResp.Post(); return SFS_STARTED; } /******************************************************************************/ /* S e n d */ /******************************************************************************/ int XrdCmsClientMan::Send(unsigned int &iMan, char *msg, int mlen) { int allok = 0; // Determine message length // if (!mlen) mlen = strlen(msg); // Send the request // myData.Lock(); iMan = manInst; if (Active) {if (Link) {if (!(allok = Link->Send(msg, mlen) > 0)) {Active = 0; Link->Close(1); manInst++; } else SendCnt++; } } myData.UnLock(); // All done // return allok; } /******************************************************************************/ int XrdCmsClientMan::Send(unsigned int &iMan, const struct iovec *iov, int iovcnt, int iotot) { int allok = 0; // Send the request // myData.Lock(); iMan = manInst; if (Active) {if (Link) {if (!(allok = Link->Send(iov, iovcnt, iotot) > 0)) {Active = 0; Link->Close(1); manInst++; } else SendCnt++; } } myData.UnLock(); // All done // return allok; } /******************************************************************************/ /* S t a r t */ /******************************************************************************/ void *XrdCmsClientMan::Start() { // First step is to connect to the manager // do {Hookup(); // Now simply start receiving messages on the stream. When we get a // respwait reply then we must be assured that the object representing // the request is added to the queue before the actual reply arrives. // We do this by waiting on syncResp which is posted once the request // object is fully processed. The actual response associated with the // respwait is synchronized during the callback phase since the client // must receive the respwait before the subsequent response. // while(Receive()) if (Response.modifier & CmsResponse::kYR_async) relayResp(); else if (Response.rrCode == kYR_status) setStatus(); else if (XrdCmsClientMsg::Reply(HPfx, Response, NetBuff)) {if (Response.rrCode == kYR_waitresp) syncResp.Wait();} // Tear down the connection // myData.Lock(); if (Link) {Link->Close(); Link = 0;} Active = 0; Suspend = 1; myData.UnLock(); // Indicate the problem // Say.Emsg("ClientMan", "Disconnected from", Host); XrdSysTimer::Snooze(dally); } while(1); // We should never get here // return (void *)0; } /******************************************************************************/ /* w h a t s U p */ /******************************************************************************/ int XrdCmsClientMan::whatsUp(const char *user, const char *path, unsigned int iMan) { EPNAME("whatsUp"); unsigned int xMan; int theDelay, inQ; bool lClose = false; // The cmsd did not respond. Increase silent count and see if restart is needed // Otherwise, increase the wait interval just in case things are just slow. // myData.Lock(); if (Active) {if (Active == RecvCnt) {if ((time(0)-lastTOut) >= repWait) {Silent++; if (Silent > nrMax) {Active = 0; Silent = 0; Suspend = 1; if (Link && iMan == manInst) {Link->Close(1); manInst++; lClose = true; } } else if (Silent & 0x02 && repWait < repWMax) repWait++; } } else {Active = RecvCnt; Silent = 0; lastTOut = time(0);} } // Calclulate how long to delay the client. This will be based on the number // of outstanding requests bounded by the config delay value. // inQ = XrdCmsClientMsg::inQ(); theDelay = inQ * qTime; xMan = manInst; myData.UnLock(); theDelay = theDelay/1000 + (theDelay % 1000 ? 1 : 0); if (theDelay < minDelay) theDelay = minDelay; if (theDelay > maxDelay) theDelay = maxDelay; // Do Some tracing here // TRACE(Redirect, user <<" no resp from inst " <Connect(Host, Port, opts))) {XrdSysTimer::Snooze(dally); if (tries--) opts = XRDNET_NOEMSG; else {opts = 0; tries = 12;} continue; } // lp->Bind(XrdSysThread::ID()); memset(&Data, 0, sizeof(Data)); Data.Mode = CmsLoginData::kYR_director; Data.HoldTime = static_cast(getpid()); if (!(rc = XrdCmsLogin::Login(lp, Data))) break; lp->Close(); XrdSysTimer::Snooze(dally); } while(1); // Establish global state // manMutex.Lock(); doDebug |= (Data.Mode & CmsLoginData::kYR_debug ? manMask : 0); manMutex.UnLock(); // All went well, finally // myData.Lock(); Link = lp; Active = 1; Silent = 0; RecvCnt = 1; SendCnt = 1; Suspend = (Data.Mode & CmsLoginData::kYR_suspend); // Calculate how long we will wait for replies before delaying the client. // This is computed dynamically based on the expected response window. // if ((oldWait = (repWait*20/100)) < 2) oldWait = 2; if (Data.HoldTime > repWMax*1000) repWait = repWMax; else if (Data.HoldTime <= 0) repWait = repWMax; else {repWait = Data.HoldTime*3; repWait = (repWait/1000) + (repWait % 1000 ? 1 : 0); if (repWait > repWMax) repWait = repWMax; else if (repWait < oldWait) repWait = oldWait; } qTime = (Data.HoldTime < 100 ? 100 : Data.HoldTime); lastTOut = time(0); myData.UnLock(); // Tell the world // sprintf(buff, "v %d", Data.Version); Say.Emsg("ClientMan", (Suspend ? "Connected to suspended" : "Connected to"), Host, buff); DEBUG(Host <<" qt=" <RecvAll((char *)&Response, sizeof(Response)) > 0) {int dlen = static_cast(ntohs(Response.datalen)); RecvCnt++; DEBUG(Link->Name() <<' ' < NetBuff->BuffSize()) && (Response.rrCode != kYR_data || !NetBuff->Resize(dlen))) Say.Emsg("ClientMan", "Excessive msg length from", Host); else {NetBuff->SetLen(dlen); return Link->RecvAll(NetBuff->Buffer(), dlen); } } return 0; } /******************************************************************************/ /* r e l a y R e s p */ /******************************************************************************/ void XrdCmsClientMan::relayResp() { EPNAME("relayResp"); XrdCmsResp *rp; // Remove the response object from our queue. // if (!(rp = RespQ.Rem(Response.streamid))) {DEBUG(Host <<" replied to non-existent request; id=" <Reply(HPfx, Response, NetBuff); // Obtain a new network buffer // NetBuff = BuffPool.Alloc(XrdOucEI::Max_Error_Len); } /******************************************************************************/ /* Private: c h k S t a t u s */ /******************************************************************************/ int XrdCmsClientMan::chkStatus() { static CmsUpdateRequest Updt = {{0, kYR_update, 0, 0}}; XrdSysMutexHelper mdMon(myData); time_t nowTime; // Count down the query count and ask again every 30 seconds // if (!chkCount--) {chkCount = chkVal; nowTime = time(0); if ((nowTime - lastUpdt) >= 30) {lastUpdt = nowTime; if (Active) Link->Send((char *)&Updt, sizeof(Updt)); } } return Suspend; } /******************************************************************************/ /* s e t S t a t u s */ /******************************************************************************/ void XrdCmsClientMan::setStatus() { EPNAME("setStatus"); const char *State = 0, *Event = "?"; myData.Lock(); if (Response.modifier & CmsStatusRequest::kYR_Suspend) {Event = "suspend"; if (!Suspend) {Suspend = 1; State = "suspended";} } else if (Response.modifier & CmsStatusRequest::kYR_Resume) {Event = "resume"; if (Suspend) {Suspend = 0; State = "resumed";} } myData.UnLock(); DEBUG(Host <<" sent " <