/******************************************************************************/ /* */ /* X r d C l i e n t C o n n M g r . c c */ /* */ /* Author: Fabrizio Furano (INFN Padova, 2004) */ /* Adapted from TXNetFile (root.cern.ch) originally done by */ /* Alvise Dorigo, Fabrizio Furano */ /* INFN Padova, 2003 */ /* */ /* This file is part of the XRootD software suite. */ /* */ /* XRootD is free software: you can redistribute it and/or modify it under */ /* the terms of the GNU Lesser General Public License as published by the */ /* Free Software Foundation, either version 3 of the License, or (at your */ /* option) any later version. */ /* */ /* XRootD is distributed in the hope that it will be useful, but WITHOUT */ /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */ /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */ /* License for more details. */ /* */ /* You should have received a copy of the GNU Lesser General Public License */ /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */ /* COPYING (GPL license). If not, see . */ /* */ /* The copyright holder's institutional names and contributor's names may not */ /* be used to endorse or promote products derived from this software without */ /* specific prior written permission of the institution or contributor. */ /******************************************************************************/ ////////////////////////////////////////////////////////////////////////// // // // The connection manager maps multiple logical connections on a single // // physical connection. // // There is one and only one logical connection per client // // and one and only one physical connection per server:port. // // Thus multiple objects withing a given application share // // the same physical TCP channel to communicate with a server. // // This reduces the time overhead for socket creation and reduces also // // the server load due to handling many sockets. // // // ////////////////////////////////////////////////////////////////////////// #include "XrdClient/XrdClientConnMgr.hh" #include "XrdClient/XrdClientDebug.hh" #include "XrdClient/XrdClientMessage.hh" #include "XrdClient/XrdClientLogConnection.hh" #include "XrdClient/XrdClientThread.hh" #include "XrdClient/XrdClientEnv.hh" #include "XrdClient/XrdClientSid.hh" #include "XrdOuc/XrdOucUtils.hh" #ifdef WIN32 #include "XrdSys/XrdWin32.hh" #endif #include #ifdef AIX #include #else #include #endif // For user info #ifndef WIN32 #include #endif // Max number allowed of logical connections ( 2**15 - 1, short int) #define XRC_MAXVECTSIZE 32767 //_____________________________________________________________________________ void * GarbageCollectorThread(void *arg, XrdClientThread *thr) { // Function executed in the garbage collector thread // Mask all allowed signals if (thr->MaskSignal(0) != 0) Error("GarbageCollectorThread", "Warning: problems masking signals"); XrdClientConnectionMgr *thisObj = (XrdClientConnectionMgr *)arg; thr->SetCancelDeferred(); thr->SetCancelOn(); while (1) { thr->CancelPoint(); thisObj->GarbageCollect(); thr->CancelPoint(); sleep(30); } return 0; } //_____________________________________________________________________________ int DisconnectElapsedPhyConn(const char *key, XrdClientPhyConnection *p, void *voidcmgr) { // Function applied to the hash table to disconnect the unused elapsed // physical connections XrdClientConnectionMgr *cmgr = (XrdClientConnectionMgr *)voidcmgr; assert(cmgr != 0); if (p) { if ((p->GetLogConnCnt() <= 0) && p->ExpiredTTL() && p->IsValid()) { p->Touch(); p->Disconnect(); } if (!p->IsValid()) { // Make sure that we kill the socket in the case of conns killed by the server p->Touch(); p->Disconnect(); // And then add the instance to the trashed list cmgr->fPhyTrash.Push_back(p); // And remove the current from here return -1; } } // Process next return 0; } //_____________________________________________________________________________ int DumpPhyConn(const char *key, XrdClientPhyConnection *p, void *voidcmgr) { if (!p) { Info(XrdClientDebug::kDUMPDEBUG, "DumpPhyConn", "Phyconn entry, key=NULL"); return 0; } Info(XrdClientDebug::kDUMPDEBUG, "DumpPhyConn", "Phyconn entry, key='" << (key ? key : "***def***") << "', LogCnt=" << p->GetLogConnCnt() << (p->IsValid() ? " Valid" : " NotValid") ) // Process next return 0; } //_____________________________________________________________________________ int DestroyPhyConn(const char *key, XrdClientPhyConnection *p, void *voidcmgr) { // Function applied to the hash table to destroy all the phyconns if (p) { p->Touch(); p->Disconnect(); p->UnsolicitedMsgHandler = 0; delete(p); } // Process next, remove current item return -1; } //_____________________________________________________________________________ XrdClientConnectionMgr::XrdClientConnectionMgr() : fSidManager(0), fGarbageColl(0) { // XrdClientConnectionMgr constructor. // Creates a Connection Manager object. // Starts the garbage collector thread. BootUp(); } //_____________________________________________________________________________ XrdClientConnectionMgr::~XrdClientConnectionMgr() { // Deletes mutex locks, stops garbage collector thread. ShutDown(); } //------------------------------------------------------------------------------ // Initializer //------------------------------------------------------------------------------ bool XrdClientConnectionMgr::BootUp() { fLastLogIdUsed = 0; fGarbageColl = new XrdClientThread(GarbageCollectorThread); if (!fGarbageColl) Error("ConnectionMgr", "Can't create garbage collector thread: out of system resources"); fGarbageColl->Run(this); fSidManager = new XrdClientSid(); if (!fSidManager) { Error("ConnectionMgr", "Can't create sid manager: out of system resources"); abort(); } return true; } bool XrdClientConnectionMgr::ShutDown() { fPhyHash.Apply(DumpPhyConn, this); { XrdSysMutexHelper mtx(fMutex); for( int i = 0; i < fLogVec.GetSize(); i++) if (fLogVec[i]) Disconnect(i, TRUE); } if (fGarbageColl) { void *ret; fGarbageColl->Cancel(); fGarbageColl->Join(&ret); delete fGarbageColl; } GarbageCollect(); fPhyHash.Apply(DestroyPhyConn, this); for(int i = fPhyTrash.GetSize()-1; i >= 0; i--) DestroyPhyConn( "Trashed connection", fPhyTrash[i], this); fPhyTrash.Clear(); fPhyHash.Purge(); fLogVec.Clear(); delete fSidManager; fSidManager = 0; fGarbageColl = 0; return true; } //_____________________________________________________________________________ void XrdClientConnectionMgr::GarbageCollect() { // Get rid of unused physical connections. 'Unused' means not used for a // TTL time from any logical one. The TTL depends on the kind of remote // server. For a load balancer the TTL is very high, while for a data server // is quite small. // Mutual exclusion on the vectors and other vars XrdSysMutexHelper mtx(fMutex); if (fPhyHash.Num() > 0) { if(DebugLevel() >= XrdClientDebug::kUSERDEBUG) fPhyHash.Apply(DumpPhyConn, this); // Cycle all the physical connections to disconnect the elapsed ones fPhyHash.Apply(DisconnectElapsedPhyConn, this); } // Cycle all the trashed physical connections to destroy the elapsed once more // after a disconnection. Doing this way, a phyconn in async mode has // all the time it needs to terminate its reader thread pool for (int i = fPhyTrash.GetSize()-1; i >= 0; i--) { DumpPhyConn("Trashed connection", fPhyTrash[i], this); if ( !fPhyTrash[i] || ((fPhyTrash[i]->GetLogConnCnt() <= 0) && (fPhyTrash[i]->ExpiredTTL())) ) { if (fPhyTrash[i] && (fPhyTrash[i]->GetReaderThreadsCnt() <= 0)) { delete fPhyTrash[i]; } fPhyTrash.Erase(i); } } } //_____________________________________________________________________________ int XrdClientConnectionMgr::Connect(XrdClientUrlInfo RemoteServ) { // Connects to the remote server: // - Looks first for an existing physical connection already bound to // User@RemoteAddress:TcpPort; // - If needed, creates a TCP channel to User@RemoteAddress:TcpPort // (this is a physical connection); // - Creates a logical connection and binds it to the previous // created physical connection; // - Returns the logical connection ID. Every client will use this // ID to deal with the server. XrdClientLogConnection *logconn = 0; XrdClientPhyConnection *phyconn = 0; CndVarInfo *cnd = 0; int newid = -1; bool phyfound = FALSE; // First we get a new logical connection object Info(XrdClientDebug::kHIDEBUG, "Connect", "Creating a logical connection..."); logconn = new XrdClientLogConnection(fSidManager); if (!logconn) { Error("Connect", "Object creation failed. Aborting."); abort(); } // If empty, fill the user name with the default to avoid fake mismatches if (RemoteServ.User.length() <= 0) { char name[256]; #ifndef WIN32 RemoteServ.User = (XrdOucUtils::UserName(getuid(), name, sizeof(name)) ? "" : name); #else DWORD length = sizeof (name); ::GetUserName(name, &length); RemoteServ.User = name; #endif } // Keys XrdOucString key; XrdOucString key1(RemoteServ.User.c_str(), 256); key1 += '@'; key1 += RemoteServ.Host; key1 += ':'; key1 += RemoteServ.Port; XrdOucString key2(RemoteServ.User.c_str(), 256); key2 += '@'; key2 += RemoteServ.HostAddr; key2 += ':'; key2 += RemoteServ.Port; do { fMutex.Lock(); cnd = 0; cnd = fConnectingCondVars.Find(key1.c_str()); if (!cnd) cnd = fConnectingCondVars.Find(key2.c_str()); // If there are no connection attempts in progress... if (!cnd) { // If we already have a physical connection to that host:port, // then we use that if (fPhyHash.Num() > 0) { XrdClientPhyConnection *p = 0; // We must avoid the unfortunate pick of a disconnected phyconn GarbageCollect(); if (((p = fPhyHash.Find(key1.c_str())) || (p = fPhyHash.Find(key2.c_str()))) && p->IsValid()) { // We link that physical connection to the new logical connection phyconn = p; phyconn->CountLogConn(); phyconn->Touch(); logconn->SetPhyConnection(phyconn); phyfound = TRUE; } else { // no connection attempts in progress and no already established connections // Mark this as an ongoing attempt // Now we have a pending conn attempt fConnectingCondVars.Rep(key1.c_str(), new CndVarInfo(), 0, Hash_keepdata); } } fMutex.UnLock(); } else { // this is an attempt which must wait for the result of a previous one // In any case after the wait we loop and recheck cnd->cv.Lock(); cnd->cnt++; fMutex.UnLock(); cnd->cv.Wait(); cnd->cnt--; cnd->cv.UnLock(); } } while (cnd); // here cnd means "if there is a condvar to wait on" // We are here if cnd == 0 if (!phyfound) { Info(XrdClientDebug::kHIDEBUG, "Connect", "Physical connection not found. Creating a new one..."); if (DebugLevel() >= XrdClientDebug::kHIDEBUG) fPhyHash.Apply(DumpPhyConn, this); // If not already present, then we must build a new physical connection, // and try to connect it // While we are trying to connect, the mutex must be unlocked // Note that at this point logconn is a pure local instance, so it // does not need to be protected by mutex if (!(phyconn = new XrdClientPhyConnection(this, fSidManager))) { Error("Connect", "Object creation failed. Aborting."); abort(); } if ( phyconn && phyconn->Connect(RemoteServ) ) { phyconn->CountLogConn(); logconn->SetPhyConnection(phyconn); if (DebugLevel() >= XrdClientDebug::kHIDEBUG) Info(XrdClientDebug::kHIDEBUG, "Connect", "New physical connection to server " << RemoteServ.Host << ":" << RemoteServ.Port << " succesfully created."); } else { // The connection attempt failed, so we signal all the threads waiting for a result { XrdSysMutexHelper mtx(fMutex); int cnt; key = key1; cnd = fConnectingCondVars.Find(key.c_str()); if (!cnd) { key = key2; cnd = fConnectingCondVars.Find(key.c_str()); } if (cnd) { cnd->cv.Lock(); cnd->cv.Broadcast(); fConnectingCondVars.Del(key.c_str()); cnt = cnd->cnt; cnd->cv.UnLock(); if (!cnt) { Info(XrdClientDebug::kHIDEBUG, "Connect", "Destroying connection condvar for " << key ); delete cnd; } } } delete logconn; // And then add the instance to the trashed list phyconn->Touch(); fPhyTrash.Push_back(phyconn); //delete phyconn; return -1; } } // Now, we are connected to the host desired. // The physical connection can be old or newly created { XrdSysMutexHelper mtx(fMutex); phyconn->WipeStreamid(logconn->Streamid()); // Then, if needed, we push the physical connection into its vector if (!phyfound) { if (!phyconn) Error("Connect"," problems connecting to " << key1); fPhyHash.Rep(key1.c_str(), phyconn, 0, Hash_keepdata); } if (fLogVec.GetSize() < XRC_MAXVECTSIZE) { // Then we push the logical connection into its vector, up to a max size fLogVec.Push_back(logconn); // and the new position is the ID newid = fLogVec.GetSize()-1; } else { // The array is too big, get a free slot, if any newid = -1; for (int i = 0; i < fLogVec.GetSize(); i++) { int idx = (fLastLogIdUsed + i) % fLogVec.GetSize(); if (!fLogVec[idx]) { fLogVec[idx] = logconn; newid = idx; fLastLogIdUsed = idx; break; } } if (newid == -1) { delete logconn; Error("Connect", "Critical error - Out of allocated resources:" " max number allowed of logical connections reached ("<= XrdClientDebug::kHIDEBUG) { int logCnt = 0; for (int i=0; i < fLogVec.GetSize(); i++) if (fLogVec[i]) logCnt++; Info(XrdClientDebug::kHIDEBUG, "Connect", "LogConn: size:" << fLogVec.GetSize() << " count: " << logCnt << "PhyConn: size:" << fPhyHash.Num()); } // The connection attempt went ok, so we signal all the threads waiting for a result, if we can still find the corresponding condvar int cnt; // if (!phyfound) { key = key1; cnd = fConnectingCondVars.Find(key.c_str()); if (!cnd) { key = key2; cnd = fConnectingCondVars.Find(key.c_str()); } if (cnd) { cnd->cv.Lock(); cnd->cv.Broadcast(); fConnectingCondVars.Del(key.c_str()); cnt = cnd->cnt; cnd->cv.UnLock(); if (!cnt) { Info(XrdClientDebug::kHIDEBUG, "Connect", "Destroying connection condvar for " << key ); delete cnd; } } // } } // mutex return newid; } //_____________________________________________________________________________ void XrdClientConnectionMgr::Disconnect(int LogConnectionID, bool ForcePhysicalDisc) { // Deletes a logical connection. // Also deletes the related physical one if ForcePhysicalDisc=TRUE. if (LogConnectionID < 0) return; { XrdSysMutexHelper mtx(fMutex); if ((LogConnectionID < 0) || (LogConnectionID >= fLogVec.GetSize()) || (!fLogVec[LogConnectionID])) { Error("Disconnect", "Destroying nonexistent logconn " << LogConnectionID); return; } fLogVec[LogConnectionID]->GetPhyConnection()->WipeStreamid(fLogVec[LogConnectionID]->Streamid()); if (ForcePhysicalDisc) { // We disconnect the phyconn // But it will be removed by the garbagecollector as soon as possible // Note that here we cannot destroy the phyconn, since there can be other // logconns pointing to it the phyconn will be removed when there are no // more logconns pointing to it fLogVec[LogConnectionID]->GetPhyConnection()->UnsolicitedMsgHandler = 0; fLogVec[LogConnectionID]->GetPhyConnection()->Disconnect(); GarbageCollect(); } fLogVec[LogConnectionID]->GetPhyConnection()->Touch(); delete fLogVec[LogConnectionID]; fLogVec[LogConnectionID] = 0; Info(XrdClientDebug::kHIDEBUG, "Disconnect", " LogConnID: " << LogConnectionID <<" destroyed"); } } //_____________________________________________________________________________ int XrdClientConnectionMgr::ReadRaw(int LogConnectionID, void *buffer, int BufferLength) { // Read BufferLength bytes from the logical connection LogConnectionID XrdClientLogConnection *logconn; logconn = GetConnection(LogConnectionID); if (logconn) { return logconn->ReadRaw(buffer, BufferLength); } else { Error("ReadRaw", "There's not a logical connection with id " << LogConnectionID); return(TXSOCK_ERR); } } //_____________________________________________________________________________ XrdClientMessage *XrdClientConnectionMgr::ReadMsg(int LogConnectionID) { XrdClientLogConnection *logconn; XrdClientMessage *mex; logconn = GetConnection(LogConnectionID); // Now we get the message from the queue, with the timeouts needed // Note that the physical connection know about streamids, NOT logconnids !! mex = logconn->GetPhyConnection()->ReadMessage(logconn->Streamid()); // Return the message unmarshalled to ClientServerCmd return mex; } //_____________________________________________________________________________ int XrdClientConnectionMgr::WriteRaw(int LogConnectionID, const void *buffer, int BufferLength, int substreamid) { // Write BufferLength bytes into the logical connection LogConnectionID XrdClientLogConnection *logconn; logconn = GetConnection(LogConnectionID); if (logconn) { return logconn->WriteRaw(buffer, BufferLength, substreamid); } else { Error("WriteRaw", "There's not a logical connection with id " << LogConnectionID); return(TXSOCK_ERR); } } //_____________________________________________________________________________ XrdClientLogConnection *XrdClientConnectionMgr::GetConnection( int LogConnectionID) { // Return a logical connection object that has LogConnectionID as its ID. XrdSysMutexHelper mtx(fMutex); return (LogConnectionID > -1) ? fLogVec[LogConnectionID] : (XrdClientLogConnection *)0; } //____________________________________________________________________________ XrdClientPhyConnection *XrdClientConnectionMgr::GetPhyConnection(XrdClientUrlInfo server) { // Gets pointer to the physical connection to 'server', if any. // Return 0 if none is found. XrdClientPhyConnection *p = 0; // If empty, fill the user name with the default to avoid fake mismatches if (server.User.length() <= 0) { char name[256]; #ifndef WIN32 server.User = (XrdOucUtils::UserName(getuid(), name, sizeof(name)) ? "" : name); #else DWORD length = sizeof (name); ::GetUserName(name, &length); server.User = name; #endif } // Keys XrdOucString key; XrdOucString key1(server.User.c_str(), 256); key1 += '@'; key1 += server.Host; key1 += ':'; key1 += server.Port; XrdOucString key2(server.User.c_str(), 256); key2 += '@'; key2 += server.HostAddr; key2 += ':'; key2 += server.Port; if (fPhyHash.Num() > 0) { if (((p = fPhyHash.Find(key1.c_str())) || (p = fPhyHash.Find(key2.c_str()))) && !(p->IsValid())) { // We found an invalid connection: do not use it p = 0; } } // Done return p; } //_____________________________________________________________________________ UnsolRespProcResult XrdClientConnectionMgr::ProcessUnsolicitedMsg(XrdClientUnsolMsgSender *sender, XrdClientMessage *unsolmsg) { // We are here if an unsolicited response comes from a physical connection // The response comes in the form of an TXMessage *, that must NOT be // destroyed after processing. It is destroyed by the first sender. // Remember that we are in a separate thread, since unsolicited responses // are asynchronous by nature. //Info(XrdClientDebug::kDUMPDEBUG, "ConnectionMgr", // "Processing unsolicited response (ID:"<HeaderSID()<<")"); UnsolRespProcResult res = kUNSOL_CONTINUE; // Local processing .... // Now we propagate the message to the interested objects. // In our architecture, the interested objects are the objects which // self-registered in the logical connections belonging to the Phyconn // which threw the event // So we throw the evt towards each logical connection { // Find an interested logid XrdSysMutexHelper mtx(fMutex); for (int i = 0; i < fLogVec.GetSize(); i++) { if ( fLogVec[i] && (fLogVec[i]->GetPhyConnection() == sender) ) { fMutex.UnLock(); res = fLogVec[i]->ProcessUnsolicitedMsg(sender, unsolmsg); fMutex.Lock(); if (res != kUNSOL_CONTINUE) break; } } } return res; }