/******************************************************************************/ /* */ /* X r d C l i e n t M S t r e a m . c c */ /* */ /* Author: Fabrizio Furano (INFN Padova, 2006) */ /* */ /* This file is part of the XRootD software suite. */ /* */ /* XRootD is free software: you can redistribute it and/or modify it under */ /* the terms of the GNU Lesser General Public License as published by the */ /* Free Software Foundation, either version 3 of the License, or (at your */ /* option) any later version. */ /* */ /* XRootD is distributed in the hope that it will be useful, but WITHOUT */ /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */ /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */ /* License for more details. */ /* */ /* You should have received a copy of the GNU Lesser General Public License */ /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */ /* COPYING (GPL license). If not, see . */ /* */ /* The copyright holder's institutional names and contributor's names may not */ /* be used to endorse or promote products derived from this software without */ /* specific prior written permission of the institution or contributor. */ /******************************************************************************/ ////////////////////////////////////////////////////////////////////////// // // // Helper code for XrdClient to handle multistream behavior // // Functionalities dealing with // // mstream creation on init // // decisions to add/remove one // // // ////////////////////////////////////////////////////////////////////////// #include "XrdClient/XrdClientProtocol.hh" #include "XrdClient/XrdClientMStream.hh" #include "XrdClient/XrdClientLogConnection.hh" #include "XrdClient/XrdClientEnv.hh" #include "XrdClient/XrdClientDebug.hh" #include "XrdClient/XrdClientThread.hh" // This has to be a socket id pool which the server will never assign by itself // Moreover, socketids are local to an instance of XrdClientPSock #define XRDCLI_PSOCKTEMP -1000 struct ParStreamOpenerArgs { XrdClientThread *thr; XrdClientConn *cliconn; int wan_port, wan_window; int tmpid; }; //_____________________________________________________________________________ void *ParStreamOpenerThread(void *arg, XrdClientThread *thr) { // This one just opens a new stream // Mask all allowed signals if (thr->MaskSignal(0) != 0) Error("ParStreamOpenerThread", "Warning: problems masking signals"); ParStreamOpenerArgs *parms = (ParStreamOpenerArgs *)arg; XrdClientMStream::AddParallelStream(parms->cliconn, parms->wan_port, parms->wan_window, parms->tmpid); return 0; } int XrdClientMStream::EstablishParallelStreams(XrdClientConn *cliconn) { int mx = EnvGetLong(NAME_MULTISTREAMCNT); int i, res; int wan_port = 0, wan_window = 0; if (mx <= 1) return 1; if (cliconn->GetServerType() == kSTBaseXrootd || cliconn->GetServerType() == kSTMetaXrootd) return 1; // Get the XrdClientPhyconn to be used XrdClientPhyConnection *phyconn = XrdClientConn::GetPhyConn(cliconn->GetLogConnID()); if (!phyconn) return 0; // For a given phyconn we allow only one single attempt to establish multiple streams // Any other thread or subsequent attempt will exit if (phyconn->TestAndSetMStreamsGoing()) return 1; // Query the server config, for the WAN port and the windowsize char *qryitems = (char *)"wan_port wan_window"; ClientRequest qryRequest; char qryResp[1024]; memset( &qryRequest, 0, sizeof(qryRequest) ); memset( qryResp, 0, 1024 ); cliconn->SetSID(qryRequest.header.streamid); qryRequest.header.requestid = kXR_query; qryRequest.query.infotype = kXR_Qconfig; qryRequest.header.dlen = strlen(qryitems); res = cliconn->SendGenCommand(&qryRequest, qryitems, 0, qryResp, false, (char *)"QueryConfig"); if (res && (cliconn->LastServerResp.status == kXR_ok) && cliconn->LastServerResp.dlen) { sscanf(qryResp, "%d\n%d", &wan_port, &wan_window); Info(XrdClientDebug::kUSERDEBUG, "XrdClientMStream::EstablishParallelStreams", "Server WAN parameters: port=" << wan_port << " windowsize=" << wan_window ); } // Start the whole bunch of asynchronous connection requests // By starting one thread for each, calling AddParallelStream once // If no more threads are available, wait and retry ParStreamOpenerArgs paropeners[16]; for (i = 0; i < mx; i++) { paropeners[i].thr = 0; paropeners[i].cliconn = cliconn; paropeners[i].wan_port = wan_port; paropeners[i].wan_window = wan_window; paropeners[i].tmpid = 0; } for (i = 0; i < mx; i++) { Info(XrdClientDebug::kHIDEBUG, "XrdClientMStream::EstablishParallelStreams", "Trying to establish " << i+1 << "th substream." ); paropeners[i].thr = new XrdClientThread(ParStreamOpenerThread); if (paropeners[i].thr) { paropeners[i].tmpid = XRDCLI_PSOCKTEMP - i; if (paropeners[i].thr->Run(&paropeners[i])) { Error("XrdClientMStream::EstablishParallelStreams", "Error establishing " << i+1 << "th substream. Thread start failed."); delete paropeners[i].thr; paropeners[i].thr = 0; break; } } } for (i = 0; i < mx; i++) if (paropeners[i].thr) { Info(XrdClientDebug::kHIDEBUG, "XrdClientMStream::EstablishParallelStreams", "Waiting for substream " << i+1 << "." ); paropeners[i].thr->Join(0); delete paropeners[i].thr; } // If something goes wrong, stop adding new streams //if (AddParallelStream(cliconn, wan_port, wan_window, XRDCLI_PSOCKTEMP - i)) // break; Info(XrdClientDebug::kHIDEBUG, "XrdClientMStream::EstablishParallelStreams", "Parallel streams establishment finished." ); return i; } // Add a parallel stream to the pool used by the given client inst // Returns 0 if ok int XrdClientMStream::AddParallelStream(XrdClientConn *cliconn, int port, int windowsz, int tempid) { // Get the XrdClientPhyconn to be used XrdClientPhyConnection *phyconn = XrdClientConn::GetPhyConn(cliconn->GetLogConnID()); // If the phyconn already has all the needed streams... exit if (phyconn->GetSockIdCount() > EnvGetLong(NAME_MULTISTREAMCNT)) return 0; // Connect a new connection, set the temp socket id and get the descriptor // Temporary means that we need one to communicate, but its final id // will be given by the server int sockdescr = phyconn->TryConnectParallelStream(port, windowsz, tempid); if (sockdescr < 0) return -1; // The connection now is here but has not yet to be considered by the reader threads // before having handshaked it, and this has to be sync man // Do the handshake ServerInitHandShake xbody; if (phyconn->DoHandShake(xbody, tempid) == kSTError) return -1; // Send the kxr_bind req to get a new substream id, going to be the final one int newid = -1; int res = -1; if (BindPendingStream(cliconn, tempid, newid) && phyconn->IsValid() ) { // Everything ok, Establish the new connection with the new id res = phyconn->EstablishPendingParallelStream(tempid, newid); if (res) { // If the establish failed we have to remove the pending stream RemoveParallelStream(cliconn, tempid); return res; } // After everything make the reader thread aware of the new stream phyconn->UnBanSockDescr(sockdescr); phyconn->ReinitFDTable(); } else { // If the bind failed we have to remove the pending stream RemoveParallelStream(cliconn, tempid); return -1; } Info(XrdClientDebug::kHIDEBUG, "XrdClientMStream::EstablishParallelStreams", "Substream added." ); return 0; } // Remove a parallel stream to the pool used by the given client inst int XrdClientMStream::RemoveParallelStream(XrdClientConn *cliconn, int substream) { // Get the XrdClientPhyconn to be used XrdClientLogConnection *log = ConnectionManager->GetConnection(cliconn->GetLogConnID()); if (!log) return 0; XrdClientPhyConnection *phyconn = log->GetPhyConnection(); if (phyconn) phyconn->RemoveParallelStream(substream); return 0; } // Binds the pending temporary parallel stream to the current session // Returns the substreamid assigned by the server into newid bool XrdClientMStream::BindPendingStream(XrdClientConn *cliconn, int substreamid, int &newid) { bool res = false; // Prepare request ClientRequest bindFileRequest; XrdClientConn::SessionIDInfo sess; ServerResponseBody_Bind bndresp; // Get the XrdClientPhyconn to be used XrdClientPhyConnection *phyconn = ConnectionManager->GetConnection(cliconn->GetLogConnID())->GetPhyConnection(); cliconn->GetSessionID(sess); memset( &bindFileRequest, 0, sizeof(bindFileRequest) ); cliconn->SetSID(bindFileRequest.header.streamid); bindFileRequest.bind.requestid = kXR_bind; memcpy( bindFileRequest.bind.sessid, sess.id, sizeof(sess.id) ); // The request has to be sent through the stream which has to be bound! clientMarshall(&bindFileRequest); res = phyconn->WriteRaw(&bindFileRequest, sizeof(bindFileRequest), substreamid); if (!res) return false; ServerResponseHeader hdr; int rdres = 0; // Now wait for the header, on the same substream rdres = phyconn->ReadRaw(&hdr, sizeof(ServerResponseHeader), substreamid); if (rdres < (int)sizeof(ServerResponseHeader)) { Error("BindPendingStream", "Error reading bind response header for substream " << substreamid << "."); return false; } clientUnmarshall(&hdr); // Now wait for the response data, if any // This code is specialized. // If the answer is not what we were expecting, just return false, // expecting that this connection will be shut down if (hdr.status != kXR_ok) { Error("BindPendingStream", "Server denied binding for substream " << substreamid << "."); return false; } if (hdr.dlen != sizeof(bndresp)) { Error("BindPendingStream", "Unrecognized response datalen binding substream " << substreamid << "."); return false; } rdres = phyconn->ReadRaw(&bndresp, sizeof(bndresp), substreamid); if (rdres != sizeof(bndresp)) { Error("BindPendingStream", "Error reading response binding substream " << substreamid << "."); return false; } newid = bndresp.substreamid; return res; } void XrdClientMStream::GetGoodSplitParameters(XrdClientConn *cliconn, int &spltsize, int &reqsperstream, kXR_int32 len) { spltsize = DFLT_MULTISTREAMSPLITSIZE; reqsperstream = 4; // Let's try to distribute the load into maximum sized chunks if (cliconn->GetParallelStreamCount() > 1) { // We start seeing which length we get trying to fill all the // available slots ( per stream) int candlen = xrdmax(DFLT_MULTISTREAMSPLITSIZE, len / (reqsperstream * (cliconn->GetParallelStreamCount()-1)) + 1 ); // We don't want blocks smaller than a min value // If this is the case we consider only one slot per stream if (candlen < DFLT_MULTISTREAMSPLITSIZE) { spltsize = xrdmax(DFLT_MULTISTREAMSPLITSIZE, len / (cliconn->GetParallelStreamCount()-1) + 1 ); reqsperstream = 1; } else spltsize = candlen; } else spltsize = len; //cout << "parstreams: " << cliconn->GetParallelStreamCount() << // " len: " << len << " splitsize: " << spltsize << " reqsperstream: " << // reqsperstream << endl << endl; } // This splits a long requests into many smaller requests, to be sent in parallel // through multiple streams // Returns false if the chunk is not worth splitting bool XrdClientMStream::SplitReadRequest(XrdClientConn *cliconn, kXR_int64 offset, kXR_int32 len, XrdClientVector &reqlists) { int spltsize = 0; int reqsperstream = 0; GetGoodSplitParameters(cliconn, spltsize, reqsperstream, len); for (kXR_int32 pp = 0; pp < len; pp += spltsize) { ReadChunk ck; ck.offset = pp+offset; ck.len = xrdmin(len - pp, spltsize); ck.streamtosend = cliconn->GetParallelStreamToUse(reqsperstream); reqlists.Push_back(ck); } return true; }