/******************************************************************************/ /* */ /* X r d X r o o t d C a l l B a c k . c c */ /* */ /* (c) 2006 by the Board of Trustees of the Leland Stanford, Jr., University */ /* All Rights Reserved */ /* Produced by Andrew Hanushevsky for Stanford University under contract */ /* DE-AC02-76-SFO0515 with the Department of Energy */ /* */ /* This file is part of the XRootD software suite. */ /* */ /* XRootD is free software: you can redistribute it and/or modify it under */ /* the terms of the GNU Lesser General Public License as published by the */ /* Free Software Foundation, either version 3 of the License, or (at your */ /* option) any later version. */ /* */ /* XRootD is distributed in the hope that it will be useful, but WITHOUT */ /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */ /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */ /* License for more details. */ /* */ /* You should have received a copy of the GNU Lesser General Public License */ /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */ /* COPYING (GPL license). If not, see . */ /* */ /* The copyright holder's institutional names and contributor's names may not */ /* be used to endorse or promote products derived from this software without */ /* specific prior written permission of the institution or contributor. */ /******************************************************************************/ #include #include #include #include #include #include "Xrd/XrdScheduler.hh" #include "XProtocol/XProtocol.hh" #include "XProtocol/XPtypes.hh" #include "XrdSys/XrdSysError.hh" #include "XrdSfs/XrdSfsInterface.hh" #include "XrdXrootd/XrdXrootdCallBack.hh" #include "XrdXrootd/XrdXrootdFile.hh" #include "XrdXrootd/XrdXrootdMonitor.hh" #include "XrdXrootd/XrdXrootdProtocol.hh" #include "XrdXrootd/XrdXrootdStats.hh" #include "XrdXrootd/XrdXrootdReqID.hh" #include "XrdXrootd/XrdXrootdTrace.hh" /******************************************************************************/ /* L o c a l C l a s s e s */ /******************************************************************************/ class XrdXrootdCBJob : XrdJob { public: static XrdXrootdCBJob *Alloc(XrdXrootdCallBack *cbF, XrdOucErrInfo *erp, const char *Path, int rval); void DoIt(); inline void Recycle(){myMutex.Lock(); Next = FreeJob; FreeJob = this; myMutex.UnLock(); } XrdXrootdCBJob(XrdXrootdCallBack *cbp, XrdOucErrInfo *erp, const char *path, int rval) : XrdJob("async response"), cbFunc(cbp), eInfo(erp), Path(path), Result(rval) {} ~XrdXrootdCBJob() {} private: void DoClose(XrdOucErrInfo *eInfo); void DoStatx(XrdOucErrInfo *eInfo); static XrdSysMutex myMutex; static XrdXrootdCBJob *FreeJob; XrdXrootdCBJob *Next; XrdXrootdCallBack *cbFunc; XrdOucErrInfo *eInfo; const char *Path; int Result; }; /******************************************************************************/ /* G l o b a l s */ /******************************************************************************/ extern XrdOucTrace *XrdXrootdTrace; namespace { XrdSysError *eDest; XrdXrootdStats *SI; XrdScheduler *Sched; int Port; } XrdSysMutex XrdXrootdCBJob::myMutex; XrdXrootdCBJob *XrdXrootdCBJob::FreeJob; /******************************************************************************/ /* X r d X r o o t d C B J o b */ /******************************************************************************/ /******************************************************************************/ /* A l l o c */ /******************************************************************************/ XrdXrootdCBJob *XrdXrootdCBJob::Alloc(XrdXrootdCallBack *cbF, XrdOucErrInfo *erp, const char *Path, int rval) { XrdXrootdCBJob *cbj; // Obtain a call back object by trying to avoid new() // myMutex.Lock(); if (!(cbj = FreeJob)) cbj = new XrdXrootdCBJob(cbF, erp, Path, rval); else {cbj->cbFunc = cbF, cbj->eInfo = erp; cbj->Result = rval;cbj->Path = Path; FreeJob = cbj->Next; } myMutex.UnLock(); // Return the new object // return cbj; } /******************************************************************************/ /* D o I t */ /******************************************************************************/ void XrdXrootdCBJob::DoIt() { static const char *TraceID = "DoIt"; // Do some tracing here // TRACE(RSP, eInfo->getErrUser() <<' ' <Func() <<" async callback"); // Some operations differ in the way we handle them. For instance, for open() // if it succeeds then we must force the client to retry the open request // because we can't attach the file to the client here. We do this by asking // the client to wait zero seconds. Protocol demands a client retry. Close // operations are always final and we need to do some cleanup. // if (*(cbFunc->Func()) == 'c') DoClose(eInfo); else if (SFS_OK == Result) {if (*(cbFunc->Func()) == 'o') {int rc = 0; cbFunc->sendResp(eInfo, kXR_wait, &rc);} else {if (*(cbFunc->Func()) == 'x') DoStatx(eInfo); cbFunc->sendResp(eInfo, kXR_ok, 0, eInfo->getErrText(), eInfo->getErrTextLen()); } } else cbFunc->sendError(Result, eInfo, Path); // Tell the requestor that the callback has completed // if (eInfo->getErrCB()) eInfo->getErrCB()->Done(Result, eInfo); else delete eInfo; eInfo = 0; Recycle(); } /******************************************************************************/ /* D o C l o s e */ /******************************************************************************/ void XrdXrootdCBJob::DoClose(XrdOucErrInfo *eInfo) { XrdXrootdFile *fP = (XrdXrootdFile *)eInfo->getErrArg(); // For close the main argument is the file pointer. Set the main arg to // be the request identifier which is saved in he file object. // eInfo->setErrArg(fP->cbArg); // Responses to close() must be final; otherwise it's a systm error. // if (Result != SFS_OK && Result != SFS_ERROR) {char buff[64]; SI->errorCnt++; sprintf(buff, "Invalid close() callcback result of %d for", Result); eDest->Emsg("DoClose", buff, Path); Result = SFS_ERROR; eInfo->setErrInfo(kXR_FSError, "Internal error; file close forced"); } // Send appropriate response (OK or error) // if (Result == SFS_OK) cbFunc->sendResp(eInfo, kXR_ok); else cbFunc->sendError(Result, eInfo, Path); // Delete he file object // delete fP; } /******************************************************************************/ /* D o S t a t x */ /******************************************************************************/ void XrdXrootdCBJob::DoStatx(XrdOucErrInfo *einfo) { const char *tp = einfo->getErrText(); char cflags[2]; int flags; // Skip to the third token // while(*tp && *tp == ' ') tp++; while(*tp && *tp != ' ') tp++; // 1st while(*tp && *tp == ' ') tp++; while(*tp && *tp != ' ') tp++; // 2nd // Convert to flags // flags = atoi(tp); // Convert to proper indicator // if (flags & kXR_offline) cflags[0] = (char)kXR_offline; else if (flags & kXR_isDir) cflags[0] = (char)kXR_isDir; else cflags[0] = (char)kXR_file; // Set the new response // cflags[1] = '\0'; einfo->setErrInfo(0, cflags); } /******************************************************************************/ /* X r d X r o o t d C a l l B a c k */ /******************************************************************************/ /******************************************************************************/ /* D o n e */ /******************************************************************************/ void XrdXrootdCallBack::Done(int &Result, //I/O: Function result XrdOucErrInfo *eInfo, // In: Error information const char *Path) // In: Path related { XrdXrootdCBJob *cbj; // Sending an async response may take a long time. So, we schedule the task // to run asynchronously from the forces that got us here. // if (!(cbj = XrdXrootdCBJob::Alloc(this, eInfo, Path, Result))) {eDest->Emsg("Done",ENOMEM,"get call back job; user",eInfo->getErrUser()); if (eInfo->getErrCB()) eInfo->getErrCB()->Done(Result, eInfo); else delete eInfo; } else Sched->Schedule((XrdJob *)cbj); } /******************************************************************************/ /* S a m e */ /******************************************************************************/ int XrdXrootdCallBack::Same(unsigned long long arg1, unsigned long long arg2) { XrdXrootdReqID ReqID1(arg1), ReqID2(arg2); unsigned char sid1[2], sid2[2]; unsigned int inst1, inst2; int lid1, lid2; ReqID1.getID(sid1, lid1, inst1); ReqID2.getID(sid2, lid2, inst2); return lid1 == lid2; } /******************************************************************************/ /* s e n d E r r o r */ /******************************************************************************/ void XrdXrootdCallBack::sendError(int rc, XrdOucErrInfo *eInfo, const char *Path) { static const char *TraceID = "fsError"; static int Xserr = kXR_ServerError; int ecode; const char *eMsg = eInfo->getErrText(ecode); const char *User = eInfo->getErrUser(); // Process the data response vector (we need to do this here) // if (rc == SFS_DATAVEC) {if (ecode > 1) sendVesp(eInfo, kXR_ok, (struct iovec *)eMsg, ecode); else sendResp(eInfo, kXR_ok, 0); return; } // Optimize error message handling here // if (eMsg && !*eMsg) eMsg = 0; // Process standard errors // if (rc == SFS_ERROR) {SI->errorCnt++; rc = XProtocol::mapError(ecode); sendResp(eInfo, kXR_error, &rc, eMsg, eInfo->getErrTextLen()+1); return; } // Process the redirection (error msg is host:port) // if (rc == SFS_REDIRECT) {SI->redirCnt++; if (ecode <= 0) ecode = (ecode ? -ecode : Port); TRACE(REDIR, User <<" async redir to " << eMsg <<':' <getErrTextLen()); if (XrdXrootdMonitor::Redirect() && Path) XrdXrootdMonitor::Redirect(eInfo->getErrMid(),eMsg,ecode,Opcode,Path); return; } // Process the deferal // if (rc >= SFS_STALL) {SI->stallCnt++; TRACE(STALL, "Stalling " <getErrTextLen()+1); return; } // Process the data response // if (rc == SFS_DATA) {if (ecode) sendResp(eInfo, kXR_ok, 0, eMsg, ecode); else sendResp(eInfo, kXR_ok, 0); return; } // Unknown conditions, report it // {char buff[64]; SI->errorCnt++; ecode = sprintf(buff, "Unknown sfs response code %d", rc); eDest->Emsg("sendError", buff); sendResp(eInfo, kXR_error, &Xserr, buff, ecode+1); return; } } /******************************************************************************/ /* s e n d R e s p */ /******************************************************************************/ void XrdXrootdCallBack::sendResp(XrdOucErrInfo *eInfo, XResponseType Status, int *Data, const char *Msg, int Mlen) { static const char *TraceID = "sendResp"; struct iovec rspVec[4]; XrdXrootdReqID ReqID; int dlen = 0, n = 1; kXR_int32 xbuf; if (Data) {xbuf = static_cast(htonl(*Data)); rspVec[n].iov_base = (caddr_t)(&xbuf); dlen = rspVec[n].iov_len = sizeof(xbuf); n++; // 1 } if (Msg && *Msg) { rspVec[n].iov_base = (caddr_t)Msg; dlen += rspVec[n].iov_len = Mlen; n++; // 2 } // Set the destination // ReqID.setID(eInfo->getErrArg()); // Send the async response // if (XrdXrootdResponse::Send(ReqID, Status, rspVec, n, dlen) < 0) eDest->Emsg("sendResp", eInfo->getErrUser(), Opname, "async resp aborted; user gone."); else if (TRACING(TRACE_RSP)) {XrdXrootdResponse theResp; theResp.Set(ReqID.Stream()); TRACE(RSP, eInfo->getErrUser() <<" async " <extData()) eInfo->Reset(); } /******************************************************************************/ /* s e n d V e s p */ /******************************************************************************/ void XrdXrootdCallBack::sendVesp(XrdOucErrInfo *eInfo, XResponseType Status, struct iovec *ioV, int ioN) { static const char *TraceID = "sendVesp"; XrdXrootdReqID ReqID; int dlen = 0; // Calculate the amount of data being sent // for (int i = 1; i < ioN; i++) dlen += ioV[i].iov_len; // Set the destination // ReqID.setID(eInfo->getErrArg()); // Send the async response // if (XrdXrootdResponse::Send(ReqID, Status, ioV, ioN, dlen) < 0) eDest->Emsg("sendResp", eInfo->getErrUser(), Opname, "async resp aborted; user gone."); else if (TRACING(TRACE_RSP)) {XrdXrootdResponse theResp; theResp.Set(ReqID.Stream()); TRACE(RSP, eInfo->getErrUser() <<" async " <extData()) eInfo->Reset(); } /******************************************************************************/ /* S e t V a l s */ /******************************************************************************/ void XrdXrootdCallBack::setVals(XrdSysError *erp, XrdXrootdStats *SIp, XrdScheduler *schp, int port) { // Set values into out unnamed static space // eDest = erp; SI = SIp; Sched = schp; Port = port; }