/******************************************************************************/
/* */
/* X r d C l i e n t R e a d V . c c */
/* */
/* Author: Fabrizio Furano (INFN Padova, 2006) */
/* */
/* This file is part of the XRootD software suite. */
/* */
/* XRootD is free software: you can redistribute it and/or modify it under */
/* the terms of the GNU Lesser General Public License as published by the */
/* Free Software Foundation, either version 3 of the License, or (at your */
/* option) any later version. */
/* */
/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
/* License for more details. */
/* */
/* You should have received a copy of the GNU Lesser General Public License */
/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
/* COPYING (GPL license). If not, see . */
/* */
/* The copyright holder's institutional names and contributor's names may not */
/* be used to endorse or promote products derived from this software without */
/* specific prior written permission of the institution or contributor. */
/******************************************************************************/
//////////////////////////////////////////////////////////////////////////
// //
// Helper functions for the vectored read functionality //
// //
//////////////////////////////////////////////////////////////////////////
#include "XrdClient/XrdClientProtocol.hh"
#include "XrdClient/XrdClientReadV.hh"
#include "XrdClient/XrdClientConn.hh"
#include "XrdClient/XrdClientDebug.hh"
#include "XrdSys/XrdSysPlatform.hh"
#include
// Builds a request and sends it to the server
// If destbuf == 0 the request is sent asynchronously
// nbuf returns the number of processed buffers
kXR_int64 XrdClientReadV::ReqReadV(XrdClientConn *xrdc, char *handle, char *destbuf,
XrdClientVector &reqvect,
int firstreq, int nreq, int streamtosend) {
readahead_list buflis[READV_MAXCHUNKS];
Info(XrdClientDebug::kUSERDEBUG, "ReqReadV",
"Requesting to read " << nreq <<
" chunks.");
kXR_int64 total_len = 0;
// Now we build the protocol-ready read ahead list
// and also put the correct placeholders inside the cache
for (int i = 0; i < nreq; i++) {
memcpy( &(buflis[i].fhandle), handle, 4 );
if (!destbuf)
xrdc->SubmitPlaceholderToCache(reqvect[firstreq+i].offset,
reqvect[firstreq+i].offset +
reqvect[firstreq+i].len-1);
buflis[i].offset = reqvect[firstreq+i].offset;
buflis[i].rlen = reqvect[firstreq+i].len;
total_len += buflis[i].rlen;
}
if (nreq > 0) {
// Prepare a request header
ClientRequest readvFileRequest;
memset( &readvFileRequest, 0, sizeof(readvFileRequest) );
xrdc->SetSID(readvFileRequest.header.streamid);
readvFileRequest.header.requestid = kXR_readv;
readvFileRequest.readv.dlen = nreq * sizeof(struct readahead_list);
if (destbuf) {
// A buffer able to hold the data and the info about the chunks
char *res_buf = new char[total_len + (nreq * sizeof(struct readahead_list))];
clientMarshallReadAheadList(buflis, readvFileRequest.readv.dlen);
bool r = xrdc->SendGenCommand(&readvFileRequest, buflis, 0,
(void *)res_buf, FALSE, (char *)"ReadV");
clientUnMarshallReadAheadList(buflis, readvFileRequest.readv.dlen);
if ( r ) {
total_len = UnpackReadVResp(destbuf, res_buf,
xrdc->LastServerResp.dlen,
buflis,
nreq);
}
else
total_len = -1;
delete [] res_buf;
}
else {
clientMarshallReadAheadList(buflis, readvFileRequest.readv.dlen);
if (xrdc->WriteToServer_Async(&readvFileRequest,
buflis) != kOK )
total_len = 0;
}
}
Info(XrdClientDebug::kHIDEBUG, "ReqReadV",
"Returning: total_len " << total_len);
return total_len;
}
// Picks a readv response and puts the individual chunks into the dest buffer
kXR_int32 XrdClientReadV::UnpackReadVResp(char *destbuf, char *respdata, kXR_int32 respdatalen,
readahead_list *buflis, int nbuf) {
int res = respdatalen;
// I just rebuild the readahead_list element
struct readahead_list header;
kXR_int32 pos_from = 0, pos_to = 0;
int i = 0;
kXR_int64 cur_buf_offset = -1;
int cur_buf_len = 0, cur_buf = 0;
while ( (pos_from < respdatalen) && (i < nbuf) ) {
memcpy(&header, respdata + pos_from, sizeof(struct readahead_list));
kXR_int64 tmpl;
memcpy(&tmpl, &header.offset, sizeof(kXR_int64) );
tmpl = ntohll(tmpl);
memcpy(&header.offset, &tmpl, sizeof(kXR_int64) );
header.rlen = ntohl(header.rlen);
// Do some consistency checks
if (cur_buf_len == 0) {
cur_buf_offset = header.offset;
if (cur_buf_offset != buflis[cur_buf].offset) {
res = -1;
break;
}
cur_buf_len += header.rlen;
if (cur_buf_len > buflis[cur_buf].rlen) {
res = -1;
break;
}
if (cur_buf_len == buflis[cur_buf].rlen) {
cur_buf++;
cur_buf_len = 0;
}
}
pos_from += sizeof(struct readahead_list);
memcpy( &destbuf[pos_to], &respdata[pos_from], header.rlen);
pos_from += header.rlen;
pos_to += header.rlen;
i++;
}
if (pos_from != respdatalen || i != nbuf)
Error("UnpackReadVResp","Inconsistency: pos_from " << pos_from <<
" respdatalen " << respdatalen <<
" i " << i <<
" nbuf " << nbuf );
if (res > 0)
res = pos_to;
return res;
}
// Picks a readv response and puts the individual chunks into the cache
int XrdClientReadV::SubmitToCacheReadVResp(XrdClientConn *xrdc, char *respdata,
kXR_int32 respdatalen) {
// This probably means that the server doesnt support ReadV
// ( old version of the server )
int res = -1;
res = respdatalen;
// I just rebuild the readahead_list element
struct readahead_list header;
kXR_int32 pos_from = 0;
kXR_int32 rlen = 0;
kXR_int64 offs=0;
// // Just to log the entries
// while ( pos_from < respdatalen ) {
// header = ( readahead_list * )(respdata + pos_from);
// memcpy(&offs, &header->offset, sizeof(kXR_int64) );
// offs = ntohll(offs);
// rlen = ntohl(header->rlen);
// pos_from += sizeof(struct readahead_list);
// Info(XrdClientDebug::kHIDEBUG, "ReadV",
// "Received chunk " << rlen << " @ " << offs );
// pos_from += rlen;
// }
pos_from = 0;
while ( pos_from < respdatalen ) {
memcpy(&header, respdata + pos_from, sizeof(struct readahead_list));
offs = ntohll(header.offset);
rlen = ntohl(header.rlen);
pos_from += sizeof(struct readahead_list);
// NOTE: we must duplicate the buffer to be submitted, since a cache block has to be
// contained in one single memblock, while here we have one for multiple chunks.
void *newbuf = malloc(rlen);
memcpy(newbuf, &respdata[pos_from], rlen);
xrdc->SubmitRawDataToCache(newbuf, offs, offs + rlen - 1);
pos_from += rlen;
}
res = pos_from;
free( respdata );
return res;
}
void XrdClientReadV::PreProcessChunkRequest(XrdClientVector &reqvect,
kXR_int64 offs, kXR_int32 len,
kXR_int64 filelen,
kXR_int32 spltsize) {
// Process a single subchunk request, eventually splitting it into more than one
kXR_int32 len_ok = 0;
kXR_int32 newlen = xrdmin(filelen - offs, len);
// We want blocks whose len does not exceed READV_MAXCHUNKSIZE
spltsize = xrdmin(spltsize, READV_MAXCHUNKSIZE);
while (len_ok < newlen) {
XrdClientReadVinfo nfo;
nfo.offset = offs+len_ok;
nfo.len = xrdmin(newlen-len_ok, spltsize);
reqvect.Push_back(nfo);
len_ok += nfo.len;
}
}