/******************************************************************************/ /* */ /* X r d C l i e n t . c c */ /* */ /* Author: Fabrizio Furano (INFN Padova, 2004) */ /* Adapted from TXNetFile (root.cern.ch) originally done by */ /* Alvise Dorigo, Fabrizio Furano */ /* INFN Padova, 2003 */ /* */ /* This file is part of the XRootD software suite. */ /* */ /* XRootD is free software: you can redistribute it and/or modify it under */ /* the terms of the GNU Lesser General Public License as published by the */ /* Free Software Foundation, either version 3 of the License, or (at your */ /* option) any later version. */ /* */ /* XRootD is distributed in the hope that it will be useful, but WITHOUT */ /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */ /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */ /* License for more details. */ /* */ /* You should have received a copy of the GNU Lesser General Public License */ /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */ /* COPYING (GPL license). If not, see . */ /* */ /* The copyright holder's institutional names and contributor's names may not */ /* be used to endorse or promote products derived from this software without */ /* specific prior written permission of the institution or contributor. */ /******************************************************************************/ ////////////////////////////////////////////////////////////////////////// // // // A UNIX reference client for xrootd. // // // ////////////////////////////////////////////////////////////////////////// #include "XrdClient/XrdClient.hh" #include "XrdClient/XrdClientDebug.hh" #include "XrdClient/XrdClientThread.hh" #include "XrdClient/XrdClientUrlSet.hh" #include "XrdClient/XrdClientConn.hh" #include "XrdClient/XrdClientEnv.hh" #include "XrdClient/XrdClientConnMgr.hh" #include "XrdClient/XrdClientSid.hh" #include "XrdClient/XrdClientMStream.hh" #include "XrdClient/XrdClientReadV.hh" #include "XrdOuc/XrdOucCRC.hh" #include "XrdClient/XrdClientReadAhead.hh" #include "XrdClient/XrdClientCallback.hh" #include #ifndef WIN32 #include #endif #include #include #include #include XrdSysSemWait XrdClient::fConcOpenSem(DFLT_MAXCONCURRENTOPENS); //_____________________________________________________________________________ // Calls the Open func in order to parallelize the Open requests // void *FileOpenerThread(void *arg, XrdClientThread *thr) { // Function executed in the garbage collector thread XrdClient *thisObj = (XrdClient *)arg; thr->SetCancelDeferred(); thr->SetCancelOn(); bool res = thisObj->TryOpen(thisObj->fOpenPars.mode, thisObj->fOpenPars.options, false); if (thisObj->fXrdCcb) thisObj->fXrdCcb->OpenComplete(thisObj, thisObj->fXrdCcbArg, res); return 0; } //_____________________________________________________________________________ XrdClient::XrdClient(const char *url, XrdClientCallback *XrdCcb, void *XrdCcbArg) : XrdClientAbs(XrdCcb, XrdCcbArg) { fReadAheadMgr = 0; fReadTrimBlockSize = 0; fOpenerTh = 0; fOpenProgCnd = new XrdSysCondVar(0); fReadWaitData = new XrdSysCondVar(0); memset(&fStatInfo, 0, sizeof(fStatInfo)); memset(&fOpenPars, 0, sizeof(fOpenPars)); memset(&fCounters, 0, sizeof(fCounters)); // Pick-up the latest setting of the debug level DebugSetLevel(EnvGetLong(NAME_DEBUG)); if (!ConnectionManager) Info(XrdClientDebug::kUSERDEBUG, "Create", "(C) 2004-2010 by the Xrootd group. XrdClient $Revision$ - Xrootd version: " << XrdVSTRING); #ifndef WIN32 signal(SIGPIPE, SIG_IGN); #endif fInitialUrl = url; fConnModule = new XrdClientConn(); if (!fConnModule) { Error("Create","Object creation failed."); abort(); } fConnModule->SetRedirHandler(this); int CacheSize = EnvGetLong(NAME_READCACHESIZE); int RaSize = EnvGetLong(NAME_READAHEADSIZE); int RmPolicy = EnvGetLong(NAME_READCACHEBLKREMPOLICY); int ReadAheadStrategy = EnvGetLong(NAME_READAHEADSTRATEGY); SetReadAheadStrategy(ReadAheadStrategy); SetBlockReadTrimming(EnvGetLong(NAME_READTRIMBLKSZ)); fUseCache = (CacheSize > 0); SetCacheParameters(CacheSize, RaSize, RmPolicy); } //_____________________________________________________________________________ XrdClient::~XrdClient() { if (IsOpen_wait()) Close(); // Terminate the opener thread fOpenProgCnd->Lock(); if (fOpenerTh) { fOpenerTh->Cancel(); fOpenerTh->Join(); delete fOpenerTh; fOpenerTh = 0; } fOpenProgCnd->UnLock(); if (fConnModule) delete fConnModule; if (fReadAheadMgr) delete fReadAheadMgr; fReadAheadMgr = 0; delete fReadWaitData; delete fOpenProgCnd; PrintCounters(); } //_____________________________________________________________________________ bool XrdClient::IsOpen_inprogress() { // Non blocking access to the 'inprogress' flag bool res; if (!fOpenProgCnd) return false; fOpenProgCnd->Lock(); res = fOpenPars.inprogress; fOpenProgCnd->UnLock(); return res; }; //_____________________________________________________________________________ bool XrdClient::IsOpen_wait() { bool res; if (!fOpenProgCnd) return false; fOpenProgCnd->Lock(); if (fOpenPars.inprogress) { fOpenProgCnd->Wait(); if (fOpenerTh) { // To prevent deadlocks in the case of // accesses from the Open() callback fOpenProgCnd->UnLock(); fOpenerTh->Join(); delete fOpenerTh; fOpenerTh = 0; // We need the lock again... sigh fOpenProgCnd->Lock(); } } res = fOpenPars.opened; fOpenProgCnd->UnLock(); return res; }; //_____________________________________________________________________________ void XrdClient::TerminateOpenAttempt() { fOpenProgCnd->Lock(); fOpenPars.inprogress = false; fOpenProgCnd->Broadcast(); fOpenProgCnd->UnLock(); fConcOpenSem.Post(); //cout << "Mytest " << time(0) << " File: " << fUrl.File << " - Open finished." << endl; } #define OpenErr(a,b) fConnModule->LastServerError.errnum=a;\ strcpy(fConnModule->LastServerError.errmsg,b);\ Error("Open", b) //_____________________________________________________________________________ bool XrdClient::Open(kXR_unt16 mode, kXR_unt16 options, bool doitparallel) { class urlHelper {public: void Erase(XrdClientUrlInfo *url){urlSet->EraseUrl(url);} XrdClientUrlInfo *Get() {return urlSet->GetARandomUrl(seed);} int Init(XrdOucString urls) {if (urlSet) delete urlSet; urlSet = new XrdClientUrlSet(urls); iSize = (urlSet->IsValid() ? urlSet->Size():0); return iSize; } int Size() {return iSize;} urlHelper() : urlSet(0), iSize(0) {seed = static_cast (getpid() ^ getppid()); } ~urlHelper() {if (urlSet) delete urlSet;} private: XrdClientUrlSet *urlSet; int iSize; unsigned int seed; }; urlHelper urlList; // But we initialize the internal params... fOpenPars.opened = FALSE; fOpenPars.options = options; fOpenPars.mode = mode; // Now we try to set up the first connection // We cycle through the list of urls given in fInitialUrl // Max number of tries int connectMaxTry = EnvGetLong(NAME_FIRSTCONNECTMAXCNT); fConnModule->SetOpTimeLimit(EnvGetLong(NAME_TRANSACTIONTIMEOUT)); // // Now start the connection phase, picking randomly from UrlList // int urlstried = 0; fConnModule->LastServerError.errnum = kXR_noErrorYet; for (int connectTry = 0; (connectTry < connectMaxTry) && (!fConnModule->IsConnected()); connectTry++) { int urlCount; if ((urlCount = urlList.Init(fInitialUrl)) < 1) { OpenErr(kXR_ArgInvalid, "The URL provided is incorrect."); return FALSE; } XrdClientUrlInfo *thisUrl = 0; urlstried = (urlstried == urlCount) ? 0 : urlstried; if ( fConnModule->IsOpTimeLimitElapsed(time(0)) ) { // We have been so unlucky and wasted too much time in connecting and being redirected fConnModule->Disconnect(TRUE); Error("Open", "Access to server failed: Too much time elapsed without success."); break; } bool nogoodurl = TRUE; while (urlCount--) { // Get an url from the available set if ((thisUrl = urlList.Get())) { if (fConnModule->CheckHostDomain(thisUrl->Host)) { nogoodurl = FALSE; Info(XrdClientDebug::kHIDEBUG, "Open", "Trying to connect to " << thisUrl->Host << ":" << thisUrl->Port << ". Connect try " << connectTry+1); fConnModule->Connect(*thisUrl, this); // To find out if we have tried the whole URLs set urlstried++; if (!(fConnModule->IsConnected())) continue; break; } else { // Invalid domain: drop the url and move to next, if any urlList.Erase(thisUrl); continue; } } } if (nogoodurl) { OpenErr(kXR_NotAuthorized, "Access denied to all URL domains requested"); break; } // We are connected to a host. Let's handshake with it. if (fConnModule->IsConnected()) { // Now the have the logical Connection ID, that we can use as streamid for // communications with the server Info(XrdClientDebug::kHIDEBUG, "Open", "The logical connection id is " << fConnModule->GetLogConnID() << "."); fConnModule->SetUrl(*thisUrl); fUrl = *thisUrl; Info(XrdClientDebug::kHIDEBUG, "Open", "Working url is " << thisUrl->GetUrl()); // after connection deal with server if (!fConnModule->GetAccessToSrv()) { if (fConnModule->GetRedirCnt() >= fConnModule->GetMaxRedirCnt()) { // We have been so unlucky. // The max number of redirections was exceeded while logging in fConnModule->Disconnect(TRUE); OpenErr(kXR_ServerError, "Unable to connect; too many redirections."); break; } if (fConnModule->LastServerError.errnum == kXR_NotAuthorized) { if (urlstried == urlList.Size()) { // Authentication error: we tried all the indicated URLs: // does not make much sense to retry fConnModule->Disconnect(TRUE); XrdOucString msg(fConnModule->LastServerError.errmsg); msg.erasefromend(1); Error("Open", "Authentication failure: " << msg); connectTry = connectMaxTry; } else { XrdOucString msg(fConnModule->LastServerError.errmsg); msg.erasefromend(1); Info(XrdClientDebug::kHIDEBUG, "Open", "Authentication failure: " << msg); } } else { fConnModule->Disconnect(TRUE); Error("Open", "Access to server failed: error: " << fConnModule->LastServerError.errnum << " (" << fConnModule->LastServerError.errmsg << ") - retrying."); } } else { Info(XrdClientDebug::kUSERDEBUG, "Open", "Access to server granted."); break; } } // The server denied access. We have to disconnect. Info(XrdClientDebug::kHIDEBUG, "Open", "Disconnecting."); fConnModule->Disconnect(FALSE); if (connectTry < connectMaxTry-1) { if (DebugLevel() >= XrdClientDebug::kUSERDEBUG) Info(XrdClientDebug::kUSERDEBUG, "Open", "Connection attempt failed. Sleeping " << EnvGetLong(NAME_RECONNECTWAIT) << " seconds."); sleep(EnvGetLong(NAME_RECONNECTWAIT)); } } //for connect try if (!fConnModule->IsConnected()) { if (fConnModule->LastServerError.errnum == kXR_noErrorYet) {OpenErr(kXR_noserver, "Server is unreachable.");} return FALSE; } // // Variable initialization // If the server is a new xrootd ( load balancer or data server) // if ((fConnModule->GetServerType() != kSTRootd) && (fConnModule->GetServerType() != kSTNone)) { // Now we are connected to a server that didn't redirect us after the // login/auth phase // let's continue with the openfile sequence Info(XrdClientDebug::kUSERDEBUG, "Open", "Opening the remote file " << fUrl.File); if (!TryOpen(mode, options, doitparallel)) { Error("Open", "Error opening the file " << fUrl.File << " on host " << fUrl.Host << ":" << fUrl.Port); if (fXrdCcb && !doitparallel) fXrdCcb->OpenComplete(this, fXrdCcbArg, false); OpenErr(kXR_Cancelled, "Open failed for unknown reason."); return FALSE; } else { if (doitparallel) { Info(XrdClientDebug::kUSERDEBUG, "Open", "File open in progress."); } else { Info(XrdClientDebug::kUSERDEBUG, "Open", "File opened succesfully."); if (fXrdCcb) fXrdCcb->OpenComplete(this, fXrdCcbArg, true); } } } else { // the server is an old rootd if (fConnModule->GetServerType() == kSTRootd) { OpenErr(kXR_ArgInvalid, "Server is not an xrootd server."); return FALSE; } if (fConnModule->GetServerType() == kSTNone) { OpenErr(kXR_ArgInvalid, "Server is not an xrootd server."); return FALSE; } } fConnModule->LastServerError.errnum = kXR_noErrorYet; return TRUE; } //_____________________________________________________________________________ int XrdClient::Read(void *buf, long long offset, int len) { XrdClientIntvList cacheholes; long blkstowait; char *tmpbuf = (char *)buf; Info( XrdClientDebug::kHIDEBUG, "Read", "Read(offs=" << offset << ", len=" << len << ")" ); if (!IsOpen_wait()) { Error("Read", "File not opened."); return 0; } // Set the max transaction duration fConnModule->SetOpTimeLimit(EnvGetLong(NAME_TRANSACTIONTIMEOUT)); fCounters.ReadRequests++; int cachesize = 0; long long cachebytessubmitted = 0; long long cachebyteshit = 0; long long cachemisscount = 0; float cachemissrate = 0.0; long long cachereadreqcnt = 0; float cachebytesusefulness = 0.0; bool cachegood = fConnModule->GetCacheInfo(cachesize, cachebytessubmitted, cachebyteshit, cachemisscount, cachemissrate, cachereadreqcnt, cachebytesusefulness); // Note: old servers do not support unsolicited responses for reads // We also use the plain sync reading if the size of the block is excessive // or no cache at all is used if (!fUseCache || !cachegood || (cachesize < len) || (fConnModule->GetServerProtocol() < 0x00000270) ) { // Without caching // Prepare a request header ClientRequest readFileRequest; memset( &readFileRequest, 0, sizeof(readFileRequest) ); fConnModule->SetSID(readFileRequest.header.streamid); readFileRequest.read.requestid = kXR_read; memcpy( readFileRequest.read.fhandle, fHandle, sizeof(fHandle) ); readFileRequest.read.offset = offset; readFileRequest.read.rlen = len; readFileRequest.read.dlen = 0; if (!fConnModule->SendGenCommand(&readFileRequest, 0, 0, (void *)buf, FALSE, (char *)"ReadBuffer")) return 0; fCounters.ReadBytes += fConnModule->LastServerResp.dlen; return fConnModule->LastServerResp.dlen; } // Ok, from now on we are sure that we have to deal with the cache // Do the read ahead long long araoffset; long aralen; if (fReadAheadMgr && fUseCache && !fReadAheadMgr->GetReadAheadHint(offset, len, araoffset, aralen, fReadTrimBlockSize) && fConnModule->CacheWillFit(aralen)) { long long o = araoffset; long l = aralen; while (l > 0) { long ll = xrdmin(4*1024*1024, l); Read_Async(o, ll, true); l -= ll; o += ll; } } struct XrdClientStatInfo stinfo; Stat(&stinfo); len = xrdmax(0, xrdmin(len, stinfo.size - offset)); bool retrysync = false; long totbytes = 0; bool cachehit = true; // we cycle until we get all the needed data do { fReadWaitData->Lock(); cacheholes.Clear(); blkstowait = 0; long bytesgot = 0; if (!retrysync) { bytesgot = fConnModule->GetDataFromCache(tmpbuf+totbytes, offset + totbytes, len + offset - 1, true, cacheholes, blkstowait); totbytes += bytesgot; Info(XrdClientDebug::kHIDEBUG, "Read", "Cache response: got " << bytesgot << "@" << offset + totbytes << " bytes. Holes= " << cacheholes.GetSize() << " Outstanding= " << blkstowait); // If the cache gives the data to us // we don't need to ask the server for them... in principle! if( bytesgot >= len ) { // The cache gave us all the requested data Info(XrdClientDebug::kHIDEBUG, "Read", "Found data in cache. len=" << len << " offset=" << offset); fReadWaitData->UnLock(); if (cachehit) fCounters.ReadHits++; fCounters.ReadBytes += len; return len; } // We are here if the cache did not give all the data to us // We should have a list of blocks to request for (int i = 0; i < cacheholes.GetSize(); i++) { long long o; long l; o = cacheholes[i].beginoffs; l = cacheholes[i].endoffs - o + 1; Info( XrdClientDebug::kUSERDEBUG, "Read", "Hole in the cache: offs=" << o << ", len=" << l ); XrdClientReadAheadMgr::TrimReadRequest(o, l, 0, fReadTrimBlockSize); Read_Async(o, l, false); cachehit = false; } } // If we got nothing from the cache let's do it sync and exit! // Note that this part has the side effect of triggering the recovery actions // if we get here after an error (or timeout) // Hence it's not a good idea to make async also this read // Remember also that a sync read request must not be modified if it's going to be // written into the application-given buffer if (retrysync || (!bytesgot && !blkstowait && !cacheholes.GetSize())) { cachehit = false; fReadWaitData->UnLock(); memset(&fConnModule->LastServerError, 0, sizeof(fConnModule->LastServerError)); fConnModule->LastServerError.errnum = kXR_noErrorYet; Info( XrdClientDebug::kHIDEBUG, "Read", "Read(offs=" << offset << ", len=" << len << "). Going sync." ); if ((fReadTrimBlockSize > 0) && !retrysync) { long long offs = offset; long l = len; XrdClientReadAheadMgr::TrimReadRequest(offs, l, 0, fReadTrimBlockSize); Read_Async(offs, l, false); blkstowait++; } else { // Prepare a request header ClientRequest readFileRequest; memset( &readFileRequest, 0, sizeof(readFileRequest) ); fConnModule->SetSID(readFileRequest.header.streamid); readFileRequest.read.requestid = kXR_read; memcpy( readFileRequest.read.fhandle, fHandle, sizeof(fHandle) ); readFileRequest.read.offset = offset; readFileRequest.read.rlen = len; readFileRequest.read.dlen = 0; if (!fConnModule->SendGenCommand(&readFileRequest, 0, 0, (void *)buf, FALSE, (char *)"ReadBuffer")) return 0; fCounters.ReadBytes += len; return len; } retrysync = false; } // Now it's time to sleep // This thread will be awakened when new data will arrive if ( (blkstowait > 0) || cacheholes.GetSize() ) { Info( XrdClientDebug::kHIDEBUG, "Read", "Waiting " << blkstowait+cacheholes.GetSize() << "outstanding blocks." ); if (!fConnModule->IsPhyConnConnected() || fReadWaitData->Wait( EnvGetLong(NAME_REQUESTTIMEOUT) ) || (fConnModule->LastServerError.errnum != kXR_noErrorYet) ) { fConnModule->LastServerError.errnum = kXR_noErrorYet; if (DebugLevel() >= XrdClientDebug::kUSERDEBUG) { fConnModule->PrintCache(); Error( "Read", "Timeout or error waiting outstanding blocks. " "Retrying sync! " "List of outstanding reqs follows." ); ConnectionManager->SidManager()->PrintoutOutstandingRequests(); } retrysync = true; } } fReadWaitData->UnLock(); } while ((blkstowait > 0) || cacheholes.GetSize()); // To lower caching overhead in copy-like applications if (EnvGetLong(NAME_REMUSEDCACHEBLKS)) { Info(XrdClientDebug::kHIDEBUG, "Read", "Removing used blocks " << 0 << "->" << offset ); fConnModule->RemoveDataFromCache(0, offset); } if (cachehit) fCounters.ReadHits++; fCounters.ReadBytes += len; return len; } //_____________________________________________________________________________ kXR_int64 XrdClient::ReadV(char *buf, kXR_int64 *offsets, int *lens, int nbuf) { // If buf==0 then the request is considered as asynchronous if (!nbuf) return 0; if (!IsOpen_wait()) { Error("ReadV", "File not opened."); return 0; } // This means problems in getting the protocol version if ( fConnModule->GetServerProtocol() < 0 ) { Info(XrdClientDebug::kHIDEBUG, "ReadV", "Problems retrieving protocol version run by the server" ); return -1; } // This means the server won't support it if ( fConnModule->GetServerProtocol() < 0x00000247 ) { Info(XrdClientDebug::kHIDEBUG, "ReadV", "The server is an old version " << fConnModule->GetServerProtocol() << " and doesn't support vectored reading" ); return -1; } Stat(0); // Set the max transaction duration fConnModule->SetOpTimeLimit(EnvGetLong(NAME_TRANSACTIONTIMEOUT)); // We pre-process the request list in order to make it compliant // with the restrictions imposed by the server XrdClientVector reqvect(nbuf); // First we want to know how much data we expect kXR_int64 maxbytes = 0; for (int ii = 0; ii < nbuf; ii++) maxbytes += lens[ii]; // Then we get a suggestion about the splitsize to use int spltsize = 0; int reqsperstream = 0; XrdClientMStream::GetGoodSplitParameters(fConnModule, spltsize, reqsperstream, maxbytes); // To optimize the splitting for the readv, we need the greater multiple // of READV_MAXCHUNKSIZE... maybe yes, maybe not... // if (spltsize > 2*READV_MAXCHUNKSIZE) blah blah // Each subchunk must not exceed spltsize bytes for (int ii = 0; ii < nbuf; ii++) XrdClientReadV::PreProcessChunkRequest(reqvect, offsets[ii], lens[ii], fStatInfo.size, spltsize ); int i = 0, startitem = 0; kXR_int64 res = 0, bytesread = 0; if (buf) fCounters.ReadVRequests++; else fCounters.ReadVAsyncRequests++; while ( i < reqvect.GetSize() ) { // Here we have the sequence of fixed blocks to request // We want to request single readv reqs which // - are compliant with the max number of blocks the server supports // - do not request more than maxbytes bytes each kXR_int64 tmpbytes = 0; int maxchunkcnt = READV_MAXCHUNKS; if (EnvGetLong(NAME_MULTISTREAMCNT) > 0) maxchunkcnt = reqvect.GetSize() / EnvGetLong(NAME_MULTISTREAMCNT)+1; if (maxchunkcnt < 2) maxchunkcnt = 2; if (maxchunkcnt > READV_MAXCHUNKS) maxchunkcnt = READV_MAXCHUNKS; int chunkcnt = 0; while ( i < reqvect.GetSize() ) { if (chunkcnt >= maxchunkcnt) break; if (tmpbytes + reqvect[i].len > spltsize) break; tmpbytes += reqvect[i].len; chunkcnt++; i++; } if (i-startitem == 1) { if (buf) { // Synchronous fCounters.ReadVSubRequests++; fCounters.ReadVSubChunks++; fCounters.ReadVBytes += reqvect[startitem].len; res = Read(buf+bytesread, reqvect[startitem].offset, reqvect[startitem].len); } else { // Asynchronous, res stays the same fCounters.ReadVAsyncSubRequests++; fCounters.ReadVAsyncSubChunks++; fCounters.ReadVAsyncBytes += reqvect[startitem].len; Read_Async(reqvect[startitem].offset, reqvect[startitem].len, false); res = reqvect[startitem].len; } } else { if (buf) { res = XrdClientReadV::ReqReadV(fConnModule, fHandle, buf+bytesread, reqvect, startitem, i-startitem, fConnModule->GetParallelStreamToUse(reqsperstream) ); fCounters.ReadVSubRequests++; fCounters.ReadVSubChunks += i-startitem; fCounters.ReadVBytes += res; } else { res = XrdClientReadV::ReqReadV(fConnModule, fHandle, 0, reqvect, startitem, i-startitem, fConnModule->GetParallelStreamToUse(reqsperstream) ); fCounters.ReadVAsyncSubRequests++; fCounters.ReadVAsyncSubChunks += i-startitem; fCounters.ReadVAsyncBytes += res; } } // The next bunch of chunks to request starts from here startitem = i; if ( res < 0 ) break; bytesread += res; } if (!buf && !fConnModule->CacheWillFit(bytesread+bytesread/4)) { Info(XrdClientDebug::kUSERDEBUG, "ReadV", "Excessive async readv size " << bytesread+bytesread/4 << ". Fixing cache size." ); SetCacheParameters(bytesread, -1, -1); } // pos will indicate the size of the data read // Even if we were able to read only a part of the buffer !!! return bytesread; } //_____________________________________________________________________________ bool XrdClient::Write(const void *buf, long long offset, int len) { if (!IsOpen_wait()) { Error("WriteBuffer", "File not opened."); return FALSE; } // Set the max transaction duration fConnModule->SetOpTimeLimit(EnvGetLong(NAME_TRANSACTIONTIMEOUT)); fCounters.WrittenBytes += len; fCounters.WriteRequests++; // Prepare request ClientRequest writeFileRequest; memset( &writeFileRequest, 0, sizeof(writeFileRequest) ); fConnModule->SetSID(writeFileRequest.header.streamid); writeFileRequest.write.requestid = kXR_write; memcpy( writeFileRequest.write.fhandle, fHandle, sizeof(fHandle) ); bool ret = false; if (!fUseCache) { // Silly situation but worth handling writeFileRequest.write.pathid = 0; writeFileRequest.write.dlen = len; writeFileRequest.write.offset = offset; ret = fConnModule->SendGenCommand(&writeFileRequest, (void *)buf, 0, 0, FALSE, (char *)"Write"); if (ret && fStatInfo.stated) fStatInfo.size = xrdmax(fStatInfo.size, offset + len); return ret; } // Soft checkpoint, we check just for timeouts in old outstanding write requests // An unrecoverable error in an old request gives no sense to continue here. // Rather unfortunate but happens. One more weird metaphor of life?!?!? if (!fConnModule->DoWriteSoftCheckPoint()) return false; fConnModule->RemoveDataFromCache(offset, offset+len-1, true); XrdClientVector rl; XrdClientMStream::SplitReadRequest(fConnModule, offset, len, rl); kXR_char *cbuf = (kXR_char *)buf; int writtenok = 0; for (int i = 0; i < rl.GetSize(); i++) { writeFileRequest.write.offset = rl[i].offset; writeFileRequest.write.dlen = rl[i].len; writeFileRequest.write.pathid = rl[i].streamtosend; // The req is sent only asynchronously. So, the only bottleneck here is the kernel // and its tcp buffer sizes... and the network of course. But beware of the crappy // default tcp settings of the various SLCs XReqErrorType b; int cnt = 0; do { b = fConnModule->WriteToServer_Async(&writeFileRequest, (kXR_char *)buf+(rl[i].offset-offset), rl[i].streamtosend); ret = (b == kOK); if (b != kNOMORESTREAMS) break; // There are no more slots for outstanding requests // Asking for a hard checkpoint is a good way to waste some time // and to wait for some slots to be free // The only drawback is that the mechanism needs enough memory to fill // the pipeline given by the network+server latency, or the max number of available slots if (!fConnModule->DoWriteHardCheckPoint()) break; } while (cnt < 10); if (b != kOK) { // We need to deal with errors while sending the request // So, if there is an error or timeout while sending the req, it has to be retried sync // in order to trigger immediately the normal retry mechanism, if needed // Try again the write op, but in sync mode writeFileRequest.write.pathid = 0; ret = fConnModule->SendGenCommand(&writeFileRequest, (kXR_char *)buf+(rl[i].offset-offset), 0, 0, FALSE, (char *)"Write"); if (!ret) break; } writtenok += rl[i].len; cbuf += rl[i].len; } if (ret && fStatInfo.stated) fStatInfo.size = xrdmax(fStatInfo.size, offset + writtenok); return ret; } //_____________________________________________________________________________ bool XrdClient::Sync() { // Flushes un-written data if (!IsOpen_wait()) { Error("Sync", "File not opened."); return FALSE; } if (!fConnModule->DoWriteHardCheckPoint()) return false; // Set the max transaction duration fConnModule->SetOpTimeLimit(EnvGetLong(NAME_TRANSACTIONTIMEOUT)); // Prepare request ClientRequest flushFileRequest; memset( &flushFileRequest, 0, sizeof(flushFileRequest) ); fConnModule->SetSID(flushFileRequest.header.streamid); flushFileRequest.sync.requestid = kXR_sync; memcpy(flushFileRequest.sync.fhandle, fHandle, sizeof(fHandle)); flushFileRequest.sync.dlen = 0; return fConnModule->SendGenCommand(&flushFileRequest, 0, 0, 0, FALSE, (char *)"Sync"); } //_____________________________________________________________________________ bool XrdClient::TryOpen(kXR_unt16 mode, kXR_unt16 options, bool doitparallel) { int thrst = 0; fOpenPars.inprogress = true; if (doitparallel) { for (int i = 0; i < DFLT_MAXCONCURRENTOPENS; i++) { fConcOpenSem.Wait(); fOpenerTh = new XrdClientThread(FileOpenerThread); thrst = fOpenerTh->Run(this); if (!thrst) { // The thread start seems OK. This open will go in parallel //if (fOpenerTh->Detach()) // Error("XrdClient", "Thread detach failed. Low system resources?"); return true; } // Note: the Post() here is intentionally missing. Error("XrdClient", "Parallel open thread start failed. Low system" " resources? Res=" << thrst << " Count=" << i); delete fOpenerTh; fOpenerTh = 0; } // If we are here it seems that this machine cannot start open threads at all // In this desperate situation we try to go sync anyway. for (int i = 0; i < DFLT_MAXCONCURRENTOPENS; i++) fConcOpenSem.Post(); Error("XrdClient", "All the parallel open thread start attempts failed." " Desperate situation. Going sync."); doitparallel = false; } // First attempt to open a remote file bool lowopenRes = LowOpen(fUrl.File.c_str(), mode, options); if (lowopenRes) { // And here we fire up the needed parallel streams XrdClientMStream::EstablishParallelStreams(fConnModule); int retc; if (!fConnModule->IsConnected()) { fOpenPars.opened = false; retc = false; } else retc = true; TerminateOpenAttempt(); return retc; } //-------------------------------------------------------------------------- // We have failed during authentication, normally this is fatal, but // we can check if we have seen a meta manager so that we could ask it to // send us to another cluster //-------------------------------------------------------------------------- if( fConnModule->GetMetaUrl() && ((fConnModule->GetCurrentUrl().Host != fConnModule->GetMetaUrl()->Host) || (fConnModule->GetCurrentUrl().Port != fConnModule->GetMetaUrl()->Port)) ) { while( fConnModule->LastServerError.errnum == kXR_NotAuthorized ) { if( fConnModule->GetRedirCnt() >= fConnModule->GetMaxRedirCnt() ) break; //---------------------------------------------------------------------- // We have seen a manager that is not the same as the meta manager, // so we need to exclude it //---------------------------------------------------------------------- if( fConnModule->GetLBSUrl() && ((fConnModule->GetLBSUrl()->Host != fConnModule->GetMetaUrl()->Host) || (fConnModule->GetLBSUrl()->Port != fConnModule->GetMetaUrl()->Port) ) ) { fExcludedHosts.push_back( fConnModule->GetLBSUrl()->Host.c_str() ); } //---------------------------------------------------------------------- // We also exclude the current host if different from the manager, // just in case //---------------------------------------------------------------------- if( fConnModule->GetLBSUrl() && ((fConnModule->GetCurrentUrl().Host != fConnModule->GetLBSUrl()->Host) || (fConnModule->GetCurrentUrl().Port != fConnModule->GetLBSUrl()->Port) ) ) { fExcludedHosts.push_back( fConnModule->GetCurrentUrl().Host.c_str() ); } //---------------------------------------------------------------------- // Ask the metamanager to redirect us to another host //---------------------------------------------------------------------- std::string excludedHosts = "&tried="; std::vector::iterator it; for( it = fExcludedHosts.begin(); it != fExcludedHosts.end(); ++it ) { excludedHosts += *it; excludedHosts += ","; } Info( XrdClientDebug::kUSERDEBUG, "Open", "Back to " << fConnModule->GetMetaUrl()->Host << " " << fConnModule->GetMetaUrl()->Port << ". Refreshing cache. Opaque info: " << excludedHosts ); fConnModule->Disconnect( true ); if( (fConnModule->GoToMetaManager() == kOK) && LowOpen( fUrl.File.c_str(), mode, options | kXR_refresh, (char *)excludedHosts.c_str() ) ) { XrdClientMStream::EstablishParallelStreams(fConnModule); TerminateOpenAttempt(); return true; } } } // If the open request failed for the error "file not found" proceed, // otherwise return FALSE if ( (fConnModule->LastServerResp.status != kXR_error) || ((fConnModule->LastServerResp.status == kXR_error) && (fConnModule->LastServerError.errnum != kXR_NotFound)) ){ TerminateOpenAttempt(); return FALSE; } // If connected to a host saying "File not Found" or similar then... // If we are currently connected to a host which is different // from the one we formerly connected, then we resend the request // specifyng the supposed failing server as opaque info if (fConnModule->GetLBSUrl() && ( (fConnModule->GetCurrentUrl().Host != fConnModule->GetLBSUrl()->Host) || (fConnModule->GetCurrentUrl().Port != fConnModule->GetLBSUrl()->Port) ) ) { XrdOucString opinfo; opinfo = "&tried=" + fConnModule->GetCurrentUrl().Host; Info(XrdClientDebug::kUSERDEBUG, "Open", "Back to " << fConnModule->GetLBSUrl()->Host << ". Refreshing cache. Opaque info: " << opinfo); // First disconnect the current logical connection (otherwise spurious // connection will stay around and create problems with processing of // unsolicited messages) fConnModule->Disconnect(FALSE); if ( (fConnModule->GoToAnotherServer(*fConnModule->GetLBSUrl()) == kOK) && LowOpen(fUrl.File.c_str(), mode, options | kXR_refresh, (char *)opinfo.c_str() ) ) { // And here we fire up the needed parallel streams XrdClientMStream::EstablishParallelStreams(fConnModule); TerminateOpenAttempt(); return TRUE; } else { Error("Open", "Error opening the file."); TerminateOpenAttempt(); return FALSE; } } TerminateOpenAttempt(); return FALSE; } //_____________________________________________________________________________ bool XrdClient::LowOpen(const char *file, kXR_unt16 mode, kXR_unt16 options, char *additionalquery) { // Low level Open method XrdOucString finalfilename(file); if ((fConnModule->fRedirOpaque.length() > 0) || additionalquery) { finalfilename += "?"; if (fConnModule->fRedirOpaque.length() > 0) finalfilename += fConnModule->fRedirOpaque; if (additionalquery) finalfilename += additionalquery; } // Send a kXR_open request in order to open the remote file ClientRequest openFileRequest; char buf[1024]; struct ServerResponseBody_Open *openresp = (struct ServerResponseBody_Open *)buf; memset(&openFileRequest, 0, sizeof(openFileRequest)); fConnModule->SetSID(openFileRequest.header.streamid); openFileRequest.header.requestid = kXR_open; openFileRequest.open.options = options | kXR_retstat; // Set the open mode field openFileRequest.open.mode = mode; // Set the length of the data (in this case data describes the path and // file name) openFileRequest.open.dlen = finalfilename.length(); // Send request to server and receive response bool resp = fConnModule->SendGenCommand(&openFileRequest, (const void *)finalfilename.c_str(), 0, openresp, false, (char *)"Open"); if (resp && (fConnModule->LastServerResp.status == 0)) { // Get the file handle to use for future read/write... if (fConnModule->LastServerResp.dlen >= (kXR_int32)sizeof(fHandle)) { memcpy( fHandle, openresp->fhandle, sizeof(fHandle) ); fOpenPars.opened = TRUE; fOpenPars.options = options; fOpenPars.mode = mode; } else Error("Open", "Server did not return a filehandle. Protocol error."); if (fConnModule->LastServerResp.dlen > 12) { // Get the stats Info(XrdClientDebug::kHIDEBUG, "Open", "Returned stats=" << ((char *)openresp + sizeof(struct ServerResponseBody_Open))); sscanf((char *)openresp + sizeof(struct ServerResponseBody_Open), "%ld %lld %ld %ld", &fStatInfo.id, &fStatInfo.size, &fStatInfo.flags, &fStatInfo.modtime); fStatInfo.stated = true; } } return fOpenPars.opened; } //_____________________________________________________________________________ bool XrdClient::Stat(struct XrdClientStatInfo *stinfo, bool force) { if (!force && fStatInfo.stated) { if (stinfo) memcpy(stinfo, &fStatInfo, sizeof(fStatInfo)); return TRUE; } if (!IsOpen_wait()) { Error("Stat", "File not opened."); return FALSE; } if (force && !Sync()) return false; // asks the server for stat file informations ClientRequest statFileRequest; memset(&statFileRequest, 0, sizeof(ClientRequest)); fConnModule->SetSID(statFileRequest.header.streamid); statFileRequest.stat.requestid = kXR_stat; memset(statFileRequest.stat.reserved, 0, sizeof(statFileRequest.stat.reserved)); statFileRequest.stat.dlen = fUrl.File.length(); char fStats[2048]; memset(fStats, 0, 2048); bool ok = fConnModule->SendGenCommand(&statFileRequest, (const char*)fUrl.File.c_str(), 0, fStats , FALSE, (char *)"Stat"); if (ok && (fConnModule->LastServerResp.status == 0) ) { Info(XrdClientDebug::kHIDEBUG, "Stat", "Returned stats=" << fStats); sscanf(fStats, "%ld %lld %ld %ld", &fStatInfo.id, &fStatInfo.size, &fStatInfo.flags, &fStatInfo.modtime); if (stinfo) memcpy(stinfo, &fStatInfo, sizeof(fStatInfo)); fStatInfo.stated = true; } return ok; } //_____________________________________________________________________________ bool XrdClient::Close() { if (!IsOpen_wait()) { Info(XrdClientDebug::kUSERDEBUG, "Close", "File not opened."); return TRUE; } ClientRequest closeFileRequest; // Set the max transaction duration fConnModule->SetOpTimeLimit(EnvGetLong(NAME_TRANSACTIONTIMEOUT)); memset(&closeFileRequest, 0, sizeof(closeFileRequest) ); fConnModule->SetSID(closeFileRequest.header.streamid); closeFileRequest.close.requestid = kXR_close; memcpy(closeFileRequest.close.fhandle, fHandle, sizeof(fHandle) ); closeFileRequest.close.dlen = 0; // Use the sync one only if the file was opened for writing // To enforce the server side correct data flushing bool status1 = true; if (IsOpenedForWrite()) if( !fConnModule->DoWriteHardCheckPoint() ) status1 = false; bool status2 = fConnModule->SendGenCommand(&closeFileRequest, 0, 0, 0 , FALSE, (char *)"Close"); // No file is opened for now fOpenPars.opened = FALSE; fConnModule->Disconnect( false ); return status1 && status2; } //_____________________________________________________________________________ bool XrdClient::OpenFileWhenRedirected(char *newfhandle, bool &wasopen) { // Called by the comm module when it needs to reopen a file // after a redir wasopen = fOpenPars.opened; if (!fOpenPars.opened) return TRUE; fOpenPars.opened = FALSE; Info(XrdClientDebug::kHIDEBUG, "OpenFileWhenRedirected", "Trying to reopen the same file." ); kXR_unt16 options = fOpenPars.options; if (fOpenPars.options & kXR_delete) { Info(XrdClientDebug::kHIDEBUG, "OpenFileWhenRedirected", "Stripping off the 'delete' option." ); options &= ~kXR_delete; options |= kXR_open_updt; } if (fOpenPars.options & kXR_new) { Info(XrdClientDebug::kHIDEBUG, "OpenFileWhenRedirected", "Stripping off the 'new' option." ); options &= ~kXR_new; options |= kXR_open_updt; } if ( TryOpen(fOpenPars.mode, options, false) ) { fOpenPars.opened = TRUE; Info(XrdClientDebug::kHIDEBUG, "OpenFileWhenRedirected", "Open successful." ); memcpy(newfhandle, fHandle, sizeof(fHandle)); return TRUE; } else { Error("OpenFileWhenRedirected", "File open failed."); return FALSE; } } //_____________________________________________________________________________ bool XrdClient::Copy(const char *localpath) { if (!IsOpen_wait()) { Error("Copy", "File not opened."); return FALSE; } Stat(0); int f = open(localpath, O_CREAT | O_RDWR, 0); if (f < 0) { Error("Copy", "Error opening local file."); return FALSE; } void *buf = malloc(100000); long long offs = 0; int nr = 1; while ((nr > 0) && (offs < fStatInfo.size)) if ( (nr = Read(buf, offs, 100000)) ) offs += write(f, buf, nr); close(f); free(buf); return TRUE; } //_____________________________________________________________________________ UnsolRespProcResult XrdClient::ProcessUnsolicitedMsg(XrdClientUnsolMsgSender *sender, XrdClientMessage *unsolmsg) { // We are here if an unsolicited response comes from a logical conn // The response comes in the form of a TXMessage *, that must NOT be // destroyed after processing. It is destroyed by the first sender. // Remember that we are in a separate thread, since unsolicited // responses are asynchronous by nature. if ( unsolmsg->GetStatusCode() != XrdClientMessage::kXrdMSC_ok ) { Info(XrdClientDebug::kHIDEBUG, "ProcessUnsolicitedMsg", "Incoming unsolicited communication error message." ); } else { Info(XrdClientDebug::kHIDEBUG, "ProcessUnsolicitedMsg", "Incoming unsolicited response from streamid " << unsolmsg->HeaderSID() ); } // Local processing .... if (unsolmsg->IsAttn()) { struct ServerResponseBody_Attn *attnbody; attnbody = (struct ServerResponseBody_Attn *)unsolmsg->GetData(); int actnum = (attnbody) ? (attnbody->actnum) : 0; // "True" async resp is processed here switch (actnum) { case kXR_asyncdi: // Disconnection + delayed reconnection request struct ServerResponseBody_Attn_asyncdi *di; di = (struct ServerResponseBody_Attn_asyncdi *)unsolmsg->GetData(); // Explicit redirection request if (di) { Info(XrdClientDebug::kUSERDEBUG, "ProcessUnsolicitedMsg", "Requested Disconnection + Reconnect in " << ntohl(di->wsec) << " seconds."); fConnModule->SetRequestedDestHost((char *)fUrl.Host.c_str(), fUrl.Port); fConnModule->SetREQDelayedConnectState(ntohl(di->wsec)); } // Other objects may be interested in this async resp return kUNSOL_CONTINUE; break; case kXR_asyncrd: // Redirection request struct ServerResponseBody_Attn_asyncrd *rd; rd = (struct ServerResponseBody_Attn_asyncrd *)unsolmsg->GetData(); // Explicit redirection request if (rd && (strlen(rd->host) > 0)) { Info(XrdClientDebug::kUSERDEBUG, "ProcessUnsolicitedMsg", "Requested redir to " << rd->host << ":" << ntohl(rd->port)); fConnModule->SetRequestedDestHost(rd->host, ntohl(rd->port)); } // Other objects may be interested in this async resp return kUNSOL_CONTINUE; break; case kXR_asyncwt: // Puts the client in wait state struct ServerResponseBody_Attn_asyncwt *wt; wt = (struct ServerResponseBody_Attn_asyncwt *)unsolmsg->GetData(); if (wt) { Info(XrdClientDebug::kUSERDEBUG, "ProcessUnsolicitedMsg", "Pausing client for " << ntohl(wt->wsec) << " seconds."); fConnModule->SetREQPauseState(ntohl(wt->wsec)); } // Other objects may be interested in this async resp return kUNSOL_CONTINUE; break; case kXR_asyncgo: // Resumes from pause state Info(XrdClientDebug::kUSERDEBUG, "ProcessUnsolicitedMsg", "Resuming from pause."); fConnModule->SetREQPauseState(0); // Other objects may be interested in this async resp return kUNSOL_CONTINUE; break; case kXR_asynresp: // A response to a request which got a kXR_waitresp as a response // We pass it direcly to the connmodule for processing // The processing will tell if the streamid matched or not, // in order to stop further processing return fConnModule->ProcessAsynResp(unsolmsg); break; default: Info(XrdClientDebug::kUSERDEBUG, "ProcessUnsolicitedMsg", "Empty message"); // Propagate the message return kUNSOL_CONTINUE; } // switch } else // Let's see if the message is a communication error message if (unsolmsg->GetStatusCode() != XrdClientMessage::kXrdMSC_ok){ // This is a low level error. The outstanding things have to be terminated // Awaken all the waiting threads, some of them may be interested fReadWaitData->Broadcast(); TerminateOpenAttempt(); return fConnModule->ProcessAsynResp(unsolmsg); } else // Let's see if we are receiving the response to an async read/write request if ( ConnectionManager->SidManager()->JoinedSids(fConnModule->GetStreamID(), unsolmsg->HeaderSID()) ) { struct SidInfo *si = ConnectionManager->SidManager()->GetSidInfo(unsolmsg->HeaderSID()); if (!si) { Error("ProcessUnsolicitedMsg", "Orphaned streamid detected: " << unsolmsg->HeaderSID()); return kUNSOL_DISPOSE; } ClientRequest *req = &(si->outstandingreq); Info(XrdClientDebug::kHIDEBUG, "ProcessUnsolicitedMsg", "Processing async response from streamid " << unsolmsg->HeaderSID() << " father=" << si->fathersid ); // We are interested in data, not errors here... if ( (unsolmsg->HeaderStatus() == kXR_oksofar) || (unsolmsg->HeaderStatus() == kXR_ok) ) { switch (req->header.requestid) { case kXR_read: { long long offs = req->read.offset + si->reqbyteprogress; Info(XrdClientDebug::kHIDEBUG, "ProcessUnsolicitedMsg", "Putting kXR_read data into cache. Offset=" << offs << " len " << unsolmsg->fHdr.dlen); { // Keep in sync with the cache lookup XrdSysCondVarHelper cndh(fReadWaitData); // To compute the end offset of the block we have to take 1 from the size! fConnModule->SubmitDataToCache(unsolmsg, offs, offs + unsolmsg->fHdr.dlen - 1); } si->reqbyteprogress += unsolmsg->fHdr.dlen; // Awaken all the waiting threads, some of them may be interested fReadWaitData->Broadcast(); if (unsolmsg->HeaderStatus() == kXR_ok) return kUNSOL_DISPOSE; else return kUNSOL_KEEP; break; } case kXR_readv: { Info(XrdClientDebug::kHIDEBUG, "ProcessUnsolicitedMsg", "Putting kXR_readV data into cache. " << " len " << unsolmsg->fHdr.dlen); { // Keep in sync with the cache lookup XrdSysCondVarHelper cndh(fReadWaitData); XrdClientReadV::SubmitToCacheReadVResp(fConnModule, (char *)unsolmsg->DonateData(), unsolmsg->fHdr.dlen); } // Awaken all the sleepers. Some of them may be interested fReadWaitData->Broadcast(); if (unsolmsg->HeaderStatus() == kXR_ok) return kUNSOL_DISPOSE; else return kUNSOL_KEEP; break; } case kXR_write: { Info(XrdClientDebug::kHIDEBUG, "ProcessUnsolicitedMsg", "Got positive ack for write req " << req->header.dlen << "@" << req->write.offset); // the corresponding cache blk has to be unpinned, and eventually // purged fConnModule->UnPinCacheBlk(req->write.offset, req->write.offset+req->header.dlen); // A bit cpu consuming... need to optimize this if (EnvGetLong(NAME_PURGEWRITTENBLOCKS)) fConnModule->RemoveDataFromCache(req->write.offset, req->write.offset+req->header.dlen-1, true); // This streamid will be released return kUNSOL_DISPOSE; } } } // if oksofar or ok else { // And here we treat the errors which can be fatal or just ugly to ignore // even if the strategy should be completely async switch (req->header.requestid) { case kXR_read: { // We invalidate the whole request in the cache Error("ProcessUnsolicitedMsg", "Got a kxr_read error. Req offset=" << req->read.offset << " len=" << req->read.rlen); { // Keep in sync with the cache lookup XrdSysCondVarHelper cndh(fReadWaitData); // To compute the end offset of the block we have to take 1 from the size! // Note that this is an error, we try to invalidate everythign which // can be related to this chunk fConnModule->RemoveDataFromCache(req->read.offset, req->read.offset + req->read.rlen - 1, true); } // Print out the error information, as received by the server struct ServerResponseBody_Error *body_err; body_err = (struct ServerResponseBody_Error *)(unsolmsg->GetData()); if (body_err) Info(XrdClientDebug::kNODEBUG, "ProcessUnsolicitedMsg", "Server declared: " << (const char*)body_err->errmsg << "(error code: " << ntohl(body_err->errnum) << ")"); // Save the last error received memset(&fConnModule->LastServerError, 0, sizeof(fConnModule->LastServerError)); memcpy(&fConnModule->LastServerError, body_err, xrdmin(sizeof(fConnModule->LastServerError), (unsigned)unsolmsg->DataLen()) ); fConnModule->LastServerError.errnum = ntohl(body_err->errnum); // Awaken all the waiting threads, some of them may be interested fReadWaitData->Broadcast(); // Other clients might be interested return kUNSOL_CONTINUE; break; } case kXR_write: { Error("ProcessUnsolicitedMsg", "Got a kxr_write error. Req offset=" << req->write.offset << " len=" << req->write.dlen); // Print out the error information, as received by the server struct ServerResponseBody_Error *body_err; body_err = (struct ServerResponseBody_Error *)(unsolmsg->GetData()); if (body_err) { Info(XrdClientDebug::kNODEBUG, "ProcessUnsolicitedMsg", "Server declared: " << (const char*)body_err->errmsg << "(error code: " << ntohl(body_err->errnum) << ") writing " << req->write.dlen << "@" << req->write.offset); // Save the last error received memset(&fConnModule->LastServerError, 0, sizeof(fConnModule->LastServerError)); memcpy(&fConnModule->LastServerError, body_err, xrdmin(sizeof(fConnModule->LastServerError), (unsigned)unsolmsg->DataLen()) ); fConnModule->LastServerError.errnum = ntohl(body_err->errnum); // Mark the request as an error. It will be catched by the next write soft checkpoint ConnectionManager->SidManager()->ReportSidResp(unsolmsg->HeaderSID(), unsolmsg->GetStatusCode(), ntohl(body_err->errnum), body_err->errmsg); } else ConnectionManager->SidManager()->ReportSidResp(unsolmsg->HeaderSID(), unsolmsg->GetStatusCode(), kXR_noErrorYet, 0); // Awaken all the waiting threads, some of them may be interested fReadWaitData->Broadcast(); // This streamid must be kept as pending. It will be handled by the subsequent // write checkpoint return kUNSOL_KEEP; break; } } // switch } // else } return kUNSOL_CONTINUE; } XReqErrorType XrdClient::Read_Async(long long offset, int len, bool updatecounters) { if (!IsOpen_wait()) { Error("Read", "File not opened."); return kGENERICERR; } Stat(0); len = xrdmin(fStatInfo.size - offset, len); if (len <= 0) return kOK; if (fUseCache) fConnModule->SubmitPlaceholderToCache(offset, offset+len-1); else return kOK; if (updatecounters) { fCounters.ReadAsyncRequests++; fCounters.ReadAsyncBytes += len; } // Prepare request ClientRequest readFileRequest; memset( &readFileRequest, 0, sizeof(readFileRequest) ); // No need to initialize the streamid, it will be filled by XrdClientConn readFileRequest.read.requestid = kXR_read; memcpy( readFileRequest.read.fhandle, fHandle, sizeof(fHandle) ); readFileRequest.read.offset = offset; readFileRequest.read.rlen = len; readFileRequest.read.dlen = 0; Info(XrdClientDebug::kHIDEBUG, "Read_Async", "Requesting to read " << readFileRequest.read.rlen << " bytes of data at offset " << readFileRequest.read.offset); // This request might be splitted and distributed through multiple streams XrdClientVector chunks; XReqErrorType ok = kOK; if (XrdClientMStream::SplitReadRequest(fConnModule, offset, len, chunks) ) { for (int i = 0; i < chunks.GetSize(); i++) { XrdClientMStream::ReadChunk *c; read_args args; memset(&args, 0, sizeof(args)); c = &chunks[i]; args.pathid = c->streamtosend; Info(XrdClientDebug::kHIDEBUG, "Read_Async", "Requesting pathid " << c->streamtosend); readFileRequest.read.offset = c->offset; readFileRequest.read.rlen = c->len; if (args.pathid != 0) { readFileRequest.read.dlen = sizeof(read_args); ok = fConnModule->WriteToServer_Async(&readFileRequest, &args, 0); } else { readFileRequest.read.dlen = 0; ok = fConnModule->WriteToServer_Async(&readFileRequest, 0, 0); } if (ok != kOK) break; } } else return (fConnModule->WriteToServer_Async(&readFileRequest, 0)); return ok; } //_____________________________________________________________________________ // Truncates the open file at a specified length bool XrdClient::Truncate(long long len) { if (!IsOpen_wait()) { Info(XrdClientDebug::kUSERDEBUG, "Truncate", "File not opened."); return true; } ClientRequest truncFileRequest; memset(&truncFileRequest, 0, sizeof(truncFileRequest) ); fConnModule->SetSID(truncFileRequest.header.streamid); truncFileRequest.truncate.requestid = kXR_truncate; memcpy(truncFileRequest.truncate.fhandle, fHandle, sizeof(fHandle) ); truncFileRequest.truncate.offset = len; bool ok = fConnModule->SendGenCommand(&truncFileRequest, 0, 0, 0 , FALSE, (char *)"Truncate"); if (ok && fStatInfo.stated) fStatInfo.size = len; return ok; } //_____________________________________________________________________________ // Sleeps on a condvar which is signalled when a new async block arrives void XrdClient::WaitForNewAsyncData() { XrdSysCondVarHelper cndh(fReadWaitData); fReadWaitData->Wait(); } //_____________________________________________________________________________ bool XrdClient::UseCache(bool u) { // Set use of cache flag after checking if the requested value make sense. // Returns the previous value to allow quick toggling of the flag. bool r = fUseCache; if (!u) { fUseCache = false; } else { int size; long long bytessubmitted, byteshit, misscount, readreqcnt; float missrate, bytesusefulness; if ( fConnModule && fConnModule->GetCacheInfo(size, bytessubmitted, byteshit, misscount, missrate, readreqcnt, bytesusefulness) && size ) fUseCache = true; } // Return the previous setting return r; } // To set at run time the cache/readahead parameters for this instance only // If a parameter is < 0 then it's left untouched. // To simply enable/disable the caching, just use UseCache(), not this function void XrdClient::SetCacheParameters(int CacheSize, int ReadAheadSize, int RmPolicy) { if (fConnModule) { if (CacheSize >= 0) fConnModule->SetCacheSize(CacheSize); if (RmPolicy >= 0) fConnModule->SetCacheRmPolicy(RmPolicy); } if ((ReadAheadSize >= 0) && fReadAheadMgr) fReadAheadMgr->SetRASize(ReadAheadSize); } // To enable/disable different read ahead strategies. Defined in XrdClientReadAhead.hh void XrdClient::SetReadAheadStrategy(int strategy) { if (!fConnModule) return; if (fReadAheadMgr && fReadAheadMgr->GetCurrentStrategy() != (XrdClientReadAheadMgr::XrdClient_RAStrategy)strategy) { delete fReadAheadMgr; fReadAheadMgr = 0; } if (!fReadAheadMgr) fReadAheadMgr = XrdClientReadAheadMgr::CreateReadAheadMgr((XrdClientReadAheadMgr::XrdClient_RAStrategy)strategy); } // To enable the trimming of the blocks to read. Blocksize will be rounded to a multiple of 512. // Each read request will have the offset and length aligned with a multiple of blocksize // This strategy is similar to a read ahead, but not quite. It anyway needs to have the cache enabled to work. // Here we see it as a transformation of the stream of the read accesses to request. void XrdClient::SetBlockReadTrimming(int blocksize) { blocksize = blocksize >> 9; blocksize = blocksize << 9; if (blocksize < 512) blocksize = 512; fReadTrimBlockSize = blocksize; } bool XrdClient::GetCacheInfo( // The actual cache size int &size, // The number of bytes submitted since the beginning long long &bytessubmitted, // The number of bytes found in the cache (estimate) long long &byteshit, // The number of reads which did not find their data // (estimate) long long &misscount, // miss/totalreads ratio (estimate) float &missrate, // number of read requests towards the cache long long &readreqcnt, // ratio between bytes found / bytes submitted float &bytesusefulness ) { if (!fConnModule) return false; if (!fConnModule->GetCacheInfo(size, bytessubmitted, byteshit, misscount, missrate, readreqcnt, bytesusefulness)) return false; return true; } // Returns client-level information about the activity performed up to now bool XrdClient::GetCounters( XrdClientCounters *cnt ) { fCounters.ReadMisses = fCounters.ReadRequests-fCounters.ReadHits; fCounters.ReadMissRate = ( fCounters.ReadRequests ? (float)fCounters.ReadMisses / fCounters.ReadRequests : 0 ); memcpy( cnt, &fCounters, sizeof(fCounters)); return true; } void XrdClient:: RemoveAllDataFromCache() { if (fConnModule) fConnModule->RemoveAllDataFromCache();} void XrdClient::RemoveDataFromCache(long long begin_offs, long long end_offs, bool remove_overlapped) { if (fConnModule) fConnModule->RemoveDataFromCache(begin_offs, end_offs, remove_overlapped); } void XrdClient::PrintCounters() { if (DebugLevel() < XrdClientDebug::kUSERDEBUG) return; XrdClientCounters cnt; GetCounters(&cnt); printf("XrdClient counters:\n");; printf(" ReadBytes: %lld\n", cnt.ReadBytes ); printf(" WrittenBytes: %lld\n", cnt.WrittenBytes ); printf(" WriteRequests: %lld\n", cnt.WriteRequests ); printf(" ReadRequests: %lld\n", cnt.ReadRequests ); printf(" ReadMisses: %lld\n", cnt.ReadMisses ); printf(" ReadHits: %lld\n", cnt.ReadHits ); printf(" ReadMissRate: %f\n", cnt.ReadMissRate ); printf(" ReadVRequests: %lld\n", cnt.ReadVRequests ); printf(" ReadVSubRequests: %lld\n", cnt.ReadVSubRequests ); printf(" ReadVSubChunks: %lld\n", cnt.ReadVSubChunks ); printf(" ReadVBytes: %lld\n", cnt.ReadVBytes ); printf(" ReadVAsyncRequests: %lld\n", cnt.ReadVAsyncRequests ); printf(" ReadVAsyncSubRequests: %lld\n", cnt.ReadVAsyncSubRequests ); printf(" ReadVAsyncSubChunks: %lld\n", cnt.ReadVAsyncSubChunks ); printf(" ReadVAsyncBytes: %lld\n", cnt.ReadVAsyncBytes ); printf(" ReadAsyncRequests: %lld\n", cnt.ReadAsyncRequests ); printf(" ReadAsyncBytes: %lld\n\n", cnt.ReadAsyncBytes ); }