/******************************************************************************/ /* */ /* X r d c p X t r e m e R e a d . c c */ /* */ /* Author: Fabrizio Furano (CERN, 2009) */ /* */ /* This file is part of the XRootD software suite. */ /* */ /* XRootD is free software: you can redistribute it and/or modify it under */ /* the terms of the GNU Lesser General Public License as published by the */ /* Free Software Foundation, either version 3 of the License, or (at your */ /* option) any later version. */ /* */ /* XRootD is distributed in the hope that it will be useful, but WITHOUT */ /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */ /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */ /* License for more details. */ /* */ /* You should have received a copy of the GNU Lesser General Public License */ /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */ /* COPYING (GPL license). If not, see . */ /* */ /* The copyright holder's institutional names and contributor's names may not */ /* be used to endorse or promote products derived from this software without */ /* specific prior written permission of the institution or contributor. */ /******************************************************************************/ ////////////////////////////////////////////////////////////////////////// // // // Utility classes handling coordinated parallel reads from multiple // // XrdClient instances // // // ////////////////////////////////////////////////////////////////////////// #include "XrdClient/XrdcpXtremeRead.hh" #include "XrdClient/XrdClientAdmin.hh" XrdXtRdFile::XrdXtRdFile(int blksize, long long filesize) { blocks = 0; clientidxcnt = 0; freeblks = 0; doneblks = 0; freeblks = nblks = (filesize + blksize - 1) / blksize; blocks = new XrdXtRdBlkInfo[nblks]; // Init the list of blocks long long ofs = 0; for (int i = 0; i < nblks; i++) { blocks[i].offs = ofs; blocks[i].len = xrdmax(0, xrdmin(filesize, ofs+blksize) - ofs); ofs += blocks[i].len; } } XrdXtRdFile::~XrdXtRdFile() { delete []blocks; } int XrdXtRdFile::GimmeANewClientIdx() { XrdSysMutexHelper m(mtx); return ++clientidxcnt; } int XrdXtRdFile::GetBlkToPrefetch(int fromidx, int clientidx, XrdXtRdBlkInfo *&blkreadonly) { // Considering fromidx as a starting point in the blocks array, // finds a block which is worth prefetching // If there are free blocks it's trivial // Otherwise it will be stolen from other readers which are clearly late XrdSysMutexHelper m(mtx); // Find a non assigned blk for (int i = 0; i < nblks; i++) { int pos = (fromidx + i) % nblks; // Find a non assigned blk if (blocks[pos].requests.GetSize() == 0) { blocks[pos].requests.Push_back(clientidx); blocks[pos].lastrequested = time(0); blkreadonly = &blocks[pos]; return pos; } } // Steal an outstanding missing block, even if in progress // The outcome of this is that, at the end, all thethe fastest free clients will // ask for the missing blks // The only thing to avoid is that a client asks twice the same blk for itself for (int i = nblks; i > 0; i--) { int pos = (fromidx + i) % nblks; // Find a non finished blk to steal if (!blocks[pos].done && !blocks[pos].AlreadyRequested(clientidx) && (blocks[pos].requests.GetSize() < 3) ) { blocks[pos].requests.Push_back(clientidx); blkreadonly = &blocks[pos]; blocks[pos].lastrequested = time(0); return pos; } } // No blocks to request or steal... probably everything's finished return -1; } int XrdXtRdFile::GetBlkToRead(int fromidx, int clientidx, XrdXtRdBlkInfo *&blkreadonly) { // Get the next already prefetched block, now we want to get its content XrdSysMutexHelper m(mtx); for (int i = 0; i < nblks; i++) { int pos = (fromidx + i) % nblks; if (!blocks[pos].done && blocks[pos].AlreadyRequested(clientidx)) { blocks[pos].lastrequested = time(0); blkreadonly = &blocks[pos]; return pos; } } return -1; } int XrdXtRdFile::MarkBlkAsRead(int blkidx) { XrdSysMutexHelper m(mtx); int reward = 0; // If the block was stolen by somebody else then the reward is negative if (blocks[blkidx].done) reward = -1; if (!blocks[blkidx].done) { doneblks++; if (blocks[blkidx].requests.GetSize() > 1) reward = 1; } blocks[blkidx].done = true; return reward; } int XrdXtRdFile::GetListOfSources(XrdClient *ref, XrdOucString xtrememgr, XrdClientVector &clients, int maxSources) { // Exploit Locate in order to find as many sources as possible. // Make sure that ref appears once and only once // Instantiate and open the relative client instances XrdClientVector hosts; if (xtrememgr == "") return 0; // In the simple case the xtrememgr is just the host of the original url. if (!xtrememgr.beginswith("root://") && !xtrememgr.beginswith("xroot://")) { // Create an acceptable xrootd url XrdOucString loc2; loc2 = "root://"; loc2 += xtrememgr; loc2 += "/xyz"; xtrememgr = loc2; } XrdClientAdmin adm(xtrememgr.c_str()); if (!adm.Connect()) return 0; int locateok = adm.Locate((kXR_char *)ref->GetCurrentUrl().File.c_str(), hosts, kXR_nowait); if (!locateok || !hosts.GetSize()) return 0; if (maxSources > hosts.GetSize()) maxSources = hosts.GetSize(); // Here we have at least a result... hopefully bool found = false; for (int i = 0; i < maxSources; i++) if (ref->GetCurrentUrl().HostWPort == (const char *)(hosts[i].Location)) { found = true; break; } // Now initialize the clients and start the parallel opens for (int i = 0; i < maxSources; i++) { XrdOucString loc; loc = "root://"; loc += (const char *)hosts[i].Location; loc += "/"; loc += ref->GetCurrentUrl().File; cout << "Source #" << i+1 << " " << loc << endl; XrdClient *cli = new XrdClient(loc.c_str()); if (cli) { clients.Push_back(cli); } } // Eventually add the ref client to the vector if (!found && ref) clients.Push_back(ref); return clients.GetSize(); }