/******************************************************************************/
/* */
/* T e s t X r d C l i e n t _ r e a d . c c */
/* */
/* This file is part of the XRootD software suite. */
/* */
/* XRootD is free software: you can redistribute it and/or modify it under */
/* the terms of the GNU Lesser General Public License as published by the */
/* Free Software Foundation, either version 3 of the License, or (at your */
/* option) any later version. */
/* */
/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
/* License for more details. */
/* */
/* You should have received a copy of the GNU Lesser General Public License */
/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
/* COPYING (GPL license). If not, see . */
/* */
/* The copyright holder's institutional names and contributor's names may not */
/* be used to endorse or promote products derived from this software without */
/* specific prior written permission of the institution or contributor. */
/******************************************************************************/
#include "XrdClient/XrdClient.hh"
#include "XrdClient/XrdClientEnv.hh"
#include "XrdSys/XrdSysHeaders.hh"
#include "XrdClient/XrdClientCallback.hh"
#include
#include
#include
#include
#include
class MyXrdClientCallback: public XrdClientCallback {
virtual void OpenComplete(XrdClientAbs *clientP, void *cbArg, bool res) {
cout << "OpenComplete! res:" << res << endl;
}
};
kXR_unt16 open_mode = (kXR_ur | kXR_uw);
kXR_unt16 open_opts = (0);
int ReadSome(kXR_int64 *offs, kXR_int32 *lens, int maxnread, long long &totalbytes) {
for (int i = 0; i < maxnread;) {
lens[i] = -1;
offs[i] = -1;
if (cin.eof()) return i;
cin >> lens[i] >> offs[i];
if ((lens[i] > 0) && (offs[i] >= 0)) {
totalbytes += lens[i];
i++;
}
}
return maxnread;
}
// Waste cpu cycles for msdelay milliseconds
void Think(long msdelay) {
timeval tv;
long long tlimit, t;
if (msdelay <= 0) return;
gettimeofday(&tv, 0);
tlimit = (long long)tv.tv_sec * 1000 + tv.tv_usec / 1000 + msdelay;
t = 0;
while ( t < tlimit ) {
double numb[1000];
for (int i = 0; i < 100; i++)
numb[i] = random();
for (int i = 0; i < 100; i++)
numb[i] = sqrt(numb[i]);
for (int i = 0; i < 100; i++)
memmove(numb+10, numb, 90*sizeof(double));
gettimeofday(&tv, 0);
t = (long long)tv.tv_sec * 1000 + tv.tv_usec / 1000;
}
}
int main(int argc, char **argv) {
void *buf;
int vectored_style = 0;
long read_delay = 0;
timeval tv;
double starttime = 0, openphasetime = 0, endtime = 0, closetime = 0;
long long totalbytesread = 0, prevtotalbytesread = 0;
long totalreadscount = 0;
int filezcount = 0;
string summarypref = "$$$";
bool iserror = false;
bool dobytecheck = false;
gettimeofday(&tv, 0);
starttime = tv.tv_sec + tv.tv_usec / 1000000.0;
closetime = openphasetime = starttime;
if (argc < 2) {
cout << endl << endl <<
"This program gets from the standard input a sequence of" << endl <<
" (one for each line, with less than 16M)" << endl <<
" and performs the corresponding read requests towards the given xrootd URL or to ALL" << endl <<
" the xrootd URLS contained in the given file." << endl <<
endl <<
"Usage: TestXrdClient_read [--check] [-DSparmname stringvalue]... [-DIparmname intvalue]..." <<
endl << endl <<
" Where:" << endl <<
" is the xrootd URL of a remote file " << endl <<
" is the read ahead size. Can be 0." << endl <<
" is the size of the internal cache, in bytes. Can be 0." << endl <<
" means 0: no vectored reads (default)," << endl <<
" 1: sync vectored reads," << endl <<
" 2: async vectored reads, do not access the buffer," << endl <<
" 3: async vectored reads, copy the buffers" << endl <<
" (makes it sync through async calls!)" << endl <<
" 4: no vectored reads. Async reads followed by sync reads." << endl <<
" (exploits the multistreaming for single reads)" << endl <<
" 5: don't read, but write data which is compatible with the --check option." << endl <<
" is the optional think time between reads." << endl <<
" note: the think time will comsume cpu cycles, not sleep." << endl <<
" --check verify if the value of the byte at offet i is i%256. Valid only for the single url mode." << endl <<
" -DSparmname stringvalue" << endl <<
" set the internal parm with the string value " << endl <<
" See XrdClientConst.hh for a list of parameters." << endl <<
" -DIparmname intvalue" << endl <<
" set the internal parm with the integer value " << endl <<
" See XrdClientConst.hh for a list of parameters." << endl <<
" Examples: -DSSocks4Server 123.345.567.8 -DISocks4Port 8080 -DIDebugLevel 1" << endl;
exit(1);
}
if (argc > 2)
EnvPutInt( NAME_READAHEADSIZE, atol(argv[2]));
if (argc >= 3)
EnvPutInt( NAME_READCACHESIZE, atol(argv[3]));
if (argc > 4)
vectored_style = atol(argv[4]);
cout << "Read style: ";
switch (vectored_style) {
case 0:
cout << "Synchronous reads, ev. with read ahead." << endl;
break;
case 1:
cout << "Synchronous readv" << endl;
break;
case 2:
cout << "Asynchronous readv, data is not processed." << endl;
break;
case 3:
cout << "Asynchronous readv." << endl;
break;
case 4:
cout << "Asynchronous reads." << endl;
break;
case 5:
cout << "Write test file." << endl;
open_opts |= kXR_open_updt;
break;
default:
cout << "Unknown." << endl;
break;
}
if (argc > 5)
read_delay = atol(argv[5]);
// The other args, they have to be an even number. Odd only if --check is there
if (argc > 6)
for (int i=6; i < argc; i++) {
if (strstr(argv[i], "--check") == argv[i]) {
cerr << "Enabling file content check." << endl;
dobytecheck = true;
continue;
}
if ( (strstr(argv[i], "-DS") == argv[i]) &&
(argc >= i+2) ) {
cerr << "Overriding " << argv[i]+3 << " with value " << argv[i+1] << ". ";
EnvPutString( argv[i]+3, argv[i+1] );
cerr << " Final value: " << EnvGetString(argv[i]+3) << endl;
i++;
continue;
}
if ( (strstr(argv[i], "-DI") == argv[i]) &&
(argc >= i+2) ) {
cerr << "Overriding '" << argv[i]+3 << "' with value " << argv[i+1] << ". ";
EnvPutInt( argv[i]+3, atoi(argv[i+1]) );
cerr << " Final value: " << EnvGetLong(argv[i]+3) << endl;
i++;
continue;
}
}
buf = malloc(200*1024*1024);
// Check if we have a file or a root:// url
bool isrooturl = (strstr(argv[1], "root://"));
int retval = 0;
int ntoread = 0;
int maxtoread = 20480;
kXR_int64 v_offsets[20480];
kXR_int32 v_lens[20480];
if (isrooturl) {
MyXrdClientCallback mycb;
XrdClient *cli = new XrdClient(argv[1], &mycb, (void *)1234);
cli->Open(open_mode, open_opts | ( (vectored_style > 4) ? kXR_delete : 0 ) );
filezcount = 1;
gettimeofday(&tv, 0);
openphasetime = tv.tv_sec + tv.tv_usec / 1000000.0;
while ( (ntoread = ReadSome(v_offsets, v_lens, maxtoread, totalbytesread)) ) {
cout << ".";
totalreadscount += ntoread;
switch (vectored_style) {
case 0: // no readv
for (int iii = 0; iii < ntoread; iii++) {
retval = cli->Read(buf, v_offsets[iii], v_lens[iii]);
if (retval <= 0) {
cout << endl << "---Read (" << iii << " of " << ntoread << " " <<
v_lens[iii] << "@" << v_offsets[iii] <<
" returned " << retval << endl;
iserror = true;
break;
}
else {
if (dobytecheck)
for ( unsigned int jj = 0; jj < (unsigned)v_lens[iii]; jj++) {
if ( ((jj+v_offsets[iii]) % 256) != ((unsigned char *)buf)[jj] ) {
cout << "errore nel file all'offset= " << jj+v_offsets[iii] <<
" letto: " << (int)((unsigned char *)buf)[jj] << " atteso: " << (jj+v_offsets[iii]) % 256 << endl;
iserror = true;
break;
}
}
if (!((iii+1) % 100)) Think(read_delay);
}
}
break;
case 1: // sync
retval = cli->ReadV((char *)buf, v_offsets, v_lens, ntoread);
cout << endl << "---ReadV returned " << retval << endl;
if (retval > 0)
for (int iii = 0; iii < ntoread; iii++){
if (!((iii+1) % 100)) Think(read_delay);
}
else {
iserror = true;
break;
}
break;
case 2: // async
retval = cli->ReadV(0, v_offsets, v_lens, ntoread);
cout << endl << "---ReadV returned " << retval << endl;
break;
case 3: // async and immediate read, optimized!
cli->RemoveAllDataFromCache();
for (int ii = 0; ii < ntoread+512; ii+=512) {
if (ii < ntoread) {
// Read a chunk of data
retval = cli->ReadV(0, v_offsets+ii, v_lens+ii, xrdmin(ntoread - ii, 512) );
cout << endl << "---ReadV returned " << retval << endl;
if (retval <= 0) {
iserror = true;
break;
}
}
// Process the preceeding chunk while the last is coming
for (int iii = ii-512; (iii >= 0) && (iii < ii) && (iii < ntoread); iii++) {
retval = cli->Read(buf, v_offsets[iii], v_lens[iii]);
if (retval <= 0)
cout << endl << "---Read (" << iii << " of " << ntoread << " " <<
v_lens[iii] << "@" << v_offsets[iii] <<
" returned " << retval << endl;
if (dobytecheck)
for ( unsigned int jj = 0; jj < (unsigned)v_lens[iii]; jj++) {
if ( ((jj+v_offsets[iii]) % 256) != ((unsigned char *)buf)[jj] ) {
cout << "errore nel file all'offset= " << jj+v_offsets[iii] <<
" letto: " << (int)((unsigned char *)buf)[jj] << " atteso: " << (jj+v_offsets[iii]) % 256 << endl;
iserror = true;
break;
}
}
if (!((iii+1) % 100)) Think(read_delay);
}
}
retval = 1;
break;
case 4: // read async and then read
cli->RemoveAllDataFromCache();
for (int iii = -512; iii < ntoread; iii++) {
if (iii + 512 < ntoread)
retval = cli->Read_Async(v_offsets[iii+512], v_lens[iii+512]);
if (retval <= 0) {
cout << endl << "---Read_Async (" << iii+512 << " of " << ntoread << " " <<
v_lens[iii+512] << "@" << v_offsets[iii+512] <<
" returned " << retval << endl;
iserror = true;
break;
}
if (iii >= 0) {
retval = cli->Read(buf, v_offsets[iii], v_lens[iii]);
if (retval <= 0) {
cout << endl << "---Read (" << iii << " of " << ntoread << " " <<
v_lens[iii] << "@" << v_offsets[iii] <<
" returned " << retval << endl;
iserror = true;
break;
}
if (!((iii+1) % 100)) Think(read_delay);
}
}
break;
case 5: // don't read... write
for (int iii = 0; iii < ntoread; iii++) {
for (int kkk = 0; kkk < v_lens[iii]; kkk++)
((unsigned char *)buf)[kkk] = (v_offsets[iii]+kkk) % 256;
retval = cli->Write(buf, v_offsets[iii], v_lens[iii]);
if (retval <= 0) {
cout << endl << "---Write (" << iii << " of " << ntoread << ") " <<
v_lens[iii] << "@" << v_offsets[iii] <<
" returned " << retval << endl;
iserror = true;
break;
}
if (retval > 0) {
if (!((iii+1) % 100)) Think(read_delay);
}
}
break;
} // switch
if (!cli->IsOpen_wait()) {
iserror = true;
break;
}
} // while
gettimeofday(&tv, 0);
closetime = tv.tv_sec + tv.tv_usec / 1000000.0;
cli->Close();
delete cli;
cli = 0;
}
else {
// Same test on multiple filez
vector xrdcvec;
ifstream filez(argv[1]);
int i = 0, fnamecount = 0;;
XrdClientUrlInfo u;
// Open all the files (in parallel man!)
while (!filez.eof()) {
string s;
XrdClient * cli;
filez >> s;
if (s != "") {
fnamecount++;
cli = new XrdClient(s.c_str());
u.TakeUrl(s.c_str());
if (cli->Open( open_mode, open_opts | ((vectored_style > 4) ? kXR_delete : 0) )) {
cout << "--- Open of " << s << " in progress." << endl;
xrdcvec.push_back(cli);
}
else delete cli;
}
i++;
}
filez.close();
filezcount = xrdcvec.size();
cout << "--- All the open requests have been submitted" << endl;
if (fnamecount == filezcount) {
i = 0;
gettimeofday(&tv, 0);
openphasetime = tv.tv_sec + tv.tv_usec / 1000000.0;
while ( (ntoread = ReadSome(v_offsets, v_lens, 10240, totalbytesread)) ) {
switch (vectored_style) {
case 0: // no readv
for (int iii = 0; iii < ntoread; iii++) {
for(int i = 0; i < (int) xrdcvec.size(); i++) {
retval = xrdcvec[i]->Read(buf, v_offsets[iii], v_lens[iii]);
if (retval <= 0) {
cout << endl << "---Read (" << iii << " of " << ntoread << ") " <<
v_lens[iii] << "@" << v_offsets[iii] <<
" returned " << retval << endl;
iserror = true;
break;
}
if (retval > 0) {
if (dobytecheck)
for ( unsigned int jj = 0; jj < (unsigned)v_lens[iii]; jj++) {
if ( ((jj+v_offsets[iii]) % 256) != ((unsigned char *)buf)[jj] ) {
cout << "errore nel file all'offset= " << jj+v_offsets[iii] <<
" letto: " << (int)((unsigned char *)buf)[jj] << " atteso: " << (jj+v_offsets[iii]) % 256 << endl;
iserror = true;
break;
}
}
Think(read_delay);
}
}
}
break;
case 1: // sync
for(int i = 0; i < (int) xrdcvec.size(); i++) {
retval = xrdcvec[i]->ReadV((char *)buf, v_offsets, v_lens, ntoread);
cout << endl << "---ReadV " << xrdcvec[i]->GetCurrentUrl().GetUrl() <<
" of " << ntoread << " chunks " <<
" returned " << retval << endl;
if (retval) {
cout << "start think " << time(0) << endl;
for (int kkk = 0; kkk < ntoread; kkk++) Think(read_delay);
cout << time(0) << endl;
}
else {
iserror = true;
break;
}
}
break;
case 2: // async
for(int i = 0; i < (int) xrdcvec.size(); i++) {
retval = xrdcvec[i]->ReadV((char *)0, v_offsets, v_lens, ntoread);
cout << endl << "---ReadV " << xrdcvec[i]->GetCurrentUrl().GetUrl() <<
" returned " << retval << endl;
}
break;
case 3: // async readv and immediate read, optimized!
for (int ii = 0; ii < ntoread; ii+=4096) {
// Read a chunk of data
for(int i = 0; i < (int) xrdcvec.size(); i++) {
retval = xrdcvec[i]->ReadV((char *)0, v_offsets+ii, v_lens+ii, xrdmin(4096, ntoread-ii));
cout << endl << "---ReadV " << xrdcvec[i]->GetCurrentUrl().GetUrl() <<
" of " << xrdmin(4096, ntoread-ii) << " chunks " <<
" returned " << retval << endl;
if (retval <= 0) {
iserror = true;
break;
}
}
// Process the preceeding chunk while the last is coming
for (int iii = ii-4096; (iii >= 0) && (iii < ii); iii++) {
for(int i = 0; i < (int) xrdcvec.size(); i++) {
retval = xrdcvec[i]->Read(buf, v_offsets[iii], v_lens[iii]);
if (retval <= 0)
cout << endl << "---Read " << xrdcvec[i]->GetCurrentUrl().GetUrl() <<
"(" << iii << " of " << ntoread << ") " <<
v_lens[iii] << "@" << v_offsets[iii] <<
" returned " << retval << endl;
if (retval > 0) {
if (dobytecheck)
for ( unsigned int jj = 0; jj < (unsigned)v_lens[iii]; jj++) {
if ( ((jj+v_offsets[iii]) % 256) != ((unsigned char *)buf)[jj] ) {
cout << "errore nel file all'offset= " << jj+v_offsets[iii] <<
" letto: " << (int)((unsigned char *)buf)[jj] << " atteso: " << (jj+v_offsets[iii]) % 256 << endl;
iserror = true;
break;
}
}
Think(read_delay);
}
}
}
}
retval = 1;
break;
case 4: // read async and then read
// Start being in advance of 512 reads per file
for(int i = 0; i < (int) xrdcvec.size(); i++) {
for (int jj = 0; jj < xrdmin(512, ntoread); jj++) {
retval = xrdcvec[i]->Read_Async(v_offsets[jj], v_lens[jj]);
if (retval != kOK) {
cout << endl << "---Read_Async " << xrdcvec[i]->GetCurrentUrl().GetUrl() <<
"(" << jj << " of " << ntoread << ") " <<
v_lens[jj] << "@" << v_offsets[jj] <<
" returned " << retval << endl;
break;
}
}
}
// Then read everything
for (int ii = 0; ii < ntoread; ii++) {
// Read_async a chunk of data per file
for(int i = 0; i < (int) xrdcvec.size(); i++) {
if (ii + 512 < ntoread)
retval = xrdcvec[i]->Read_Async(v_offsets[ii+512], v_lens[ii+512]);
if (retval != kOK) {
cout << endl << "---Read_Async " << xrdcvec[i]->GetCurrentUrl().GetUrl() <<
"(" << ii+512 << " of " << ntoread << ") " <<
v_lens[ii+512] << "@" << v_offsets[ii+512] <<
" returned " << retval << endl;
}
}
// Now process one chunk per file
for(int i = 0; i < (int) xrdcvec.size(); i++) {
retval = xrdcvec[i]->Read(buf, v_offsets[ii], v_lens[ii]);
if (retval > 0) {
if (dobytecheck)
for ( unsigned int jj = 0; jj < (unsigned)v_lens[ii]; jj++) {
if ( ((jj+v_offsets[ii]) % 256) != ((unsigned char *)buf)[jj] ) {
cout << "errore nel file all'offset= " << jj+v_offsets[ii] <<
" letto: " << (int)((unsigned char *)buf)[jj] << " atteso: " << (jj+v_offsets[ii]) % 256 << endl;
iserror = true;
break;
}
}
Think(read_delay);
}
else {
cout << endl << "---Read (" << ii << " of " << ntoread << ") " <<
v_lens[ii] << "@" << v_offsets[ii] <<
" returned " << retval << endl;
iserror = true;
break;
}
if (iserror) break;
} // for i
if (iserror) break;
} // for ii
retval = 1;
break;
case 5: // don't read... write
for (int iii = 0; iii < ntoread; iii++) {
for(int i = 0; i < (int) xrdcvec.size(); i++) {
for (int kkk = 0; kkk < v_lens[iii]; kkk++)
((unsigned char *)buf)[kkk] = (v_offsets[iii]+kkk) % 256;
retval = xrdcvec[i]->Write(buf, v_offsets[iii], v_lens[iii]);
if (retval <= 0) {
cout << endl << "---Write (" << iii << " of " << ntoread << ") " <<
v_lens[iii] << "@" << v_offsets[iii] <<
" returned " << retval << endl;
iserror = true;
break;
}
if (retval > 0) {
Think(read_delay);
}
}
}
break;
} // switch
if (iserror && prevtotalbytesread) {
totalbytesread = prevtotalbytesread;
break;
}
prevtotalbytesread = totalbytesread;
totalreadscount += ntoread;
} // while readsome
gettimeofday(&tv, 0);
closetime = tv.tv_sec + tv.tv_usec / 1000000.0;
cout << endl << endl << "--- Closing all instances" << endl;
for(int i = 0; i < (int) xrdcvec.size(); i++) {
if (xrdcvec[i]->IsOpen()) xrdcvec[i]->Close();
else cout << "WARNING: file '" <<
xrdcvec[i]->GetCurrentUrl().GetUrl() << " was not opened." << endl;
}
cout << "--- Deleting all instances" << endl;
for(int i = 0; i < (int) xrdcvec.size(); i++) delete xrdcvec[i];
cout << "--- Clearing pointer vector" << endl;
xrdcvec.clear();
} //if fnamecount == filezcount
} // Case of multiple urls
cout << "--- Freeing buffer" << endl;
free(buf);
gettimeofday(&tv, 0);
endtime = tv.tv_sec + tv.tv_usec / 1000000.0;
if (iserror) summarypref = "%%%";
cout << "Summary ----------------------------" << endl;
cout << summarypref << " starttime: " << starttime << endl;
cout << summarypref << " lastopentime: " << openphasetime << endl;
cout << summarypref << " closetime: " << closetime << endl;
cout << summarypref << " endtime: " << endtime << endl;
cout << summarypref << " open_elapsed: " << openphasetime - starttime << endl;
cout << summarypref << " data_xfer_elapsed: " << closetime - openphasetime << endl;
cout << summarypref << " close_elapsed: " << endtime - closetime << endl;
cout << summarypref << " total_elapsed: " << endtime - starttime << endl;
cout << summarypref << " totalbytesreadperfile: " << totalbytesread << endl;
cout << summarypref << " maxbytesreadpersecperfile: " << totalbytesread / (closetime - openphasetime) << endl;
cout << summarypref << " effbytesreadpersecperfile: " << totalbytesread / (endtime - starttime) << endl;
cout << summarypref << " readscountperfile: " << totalreadscount << endl;
cout << summarypref << " openedkofilescount: " << filezcount << endl;
cout << endl;
return 0;
}