/******************************************************************************/
/* */
/* X r d P o s i x F i l e . c c */
/* */
/* (c) 2013 by the Board of Trustees of the Leland Stanford, Jr., University */
/* All Rights Reserved */
/* Produced by Andrew Hanushevsky for Stanford University under contract */
/* DE-AC02-76-SFO0515 with the Department of Energy */
/* */
/* This file is part of the XRootD software suite. */
/* */
/* XRootD is free software: you can redistribute it and/or modify it under */
/* the terms of the GNU Lesser General Public License as published by the */
/* Free Software Foundation, either version 3 of the License, or (at your */
/* option) any later version. */
/* */
/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
/* License for more details. */
/* */
/* You should have received a copy of the GNU Lesser General Public License */
/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
/* COPYING (GPL license). If not, see . */
/* */
/* The copyright holder's institutional names and contributor's names may not */
/* be used to endorse or promote products derived from this software without */
/* specific prior written permission of the institution or contributor. */
/******************************************************************************/
#include
#include
#include
#include
#include
#include
#include
#include
#include "XrdOuc/XrdOucName2Name.hh"
#include "XrdPosix/XrdPosixCallBack.hh"
#include "XrdPosix/XrdPosixFile.hh"
#include "XrdPosix/XrdPosixFileRH.hh"
#include "XrdPosix/XrdPosixPrepIO.hh"
#include "XrdPosix/XrdPosixTrace.hh"
#include "XrdPosix/XrdPosixXrootdPath.hh"
#include "XrdSys/XrdSysError.hh"
#include "XrdSys/XrdSysTimer.hh"
/******************************************************************************/
/* S t a t i c M e m b e r s */
/******************************************************************************/
namespace XrdPosixGlobals
{
extern XrdOucCache2 *theCache;
extern XrdOucName2Name *theN2N;
extern XrdSysError *eDest;
extern int ddInterval;
extern int ddMaxTries;
int ddNumLost = 0;
};
namespace
{
XrdPosixFile *InitDDL()
{
pthread_t tid;
XrdSysThread::Run(&tid, XrdPosixFile::DelayedDestroy, 0, 0, "PosixFileDestroy");
return (XrdPosixFile *)0;
}
std::string dsProperty("DataServer");
};
XrdSysSemaphore XrdPosixFile::ddSem(0);
XrdSysMutex XrdPosixFile::ddMutex;
XrdPosixFile *XrdPosixFile::ddList = InitDDL();
XrdPosixFile *XrdPosixFile::ddLost = 0;
char *XrdPosixFile::sfSFX = 0;
short XrdPosixFile::sfSLN = 0;
bool XrdPosixFile::ddPosted = false;
int XrdPosixFile::ddNum = 0;
/******************************************************************************/
/* C o n s t r u c t o r */
/******************************************************************************/
XrdPosixFile::XrdPosixFile(bool &aOK, const char *path, XrdPosixCallBack *cbP,
int Opts)
: XCio((XrdOucCacheIO2 *)this), PrepIO(0),
mySize(0), myMtime(0), myInode(0), myMode(0),
theCB(cbP), fLoc(0), cOpt(0),
isStream(Opts & isStrm ? 1 : 0)
{
// Handle path generation. This is trickt as we may have two namespaces. One
// for the origin and one for the cache.
//
fOpen = strdup(path); aOK = true;
if (!XrdPosixGlobals::theN2N || !XrdPosixGlobals::theCache) fPath = fOpen;
else if (!XrdPosixXrootPath::P2L("file",path,fPath)) aOK = false;
else if (!fPath) fPath = fOpen;
// Check for structured file check
//
if (sfSFX)
{int n = strlen(path);
if (n > sfSLN && !strcmp(sfSFX, path + n - sfSLN))
cOpt = XrdOucCache::optFIS;
}
// Set cache update option
//
if (Opts & isUpdt) cOpt |= XrdOucCache::optRW;
}
/******************************************************************************/
/* D e s t r u c t o r */
/******************************************************************************/
XrdPosixFile::~XrdPosixFile()
{
// Detach the cache if it is attached
//
if (XCio != this) XCio->Detach();
// Close the remote connection
//
if (clFile.IsOpen()) {XrdCl::XRootDStatus status = clFile.Close();};
// Get rid of defered open object
//
if (PrepIO) delete PrepIO;
// Free the path and location information
//
if (fPath) free(fPath);
if (fOpen != fPath) free(fOpen);
if (fLoc) free(fLoc);
}
/******************************************************************************/
/* D e l a y e d D e s t r o y */
/******************************************************************************/
void* XrdPosixFile::DelayedDestroy(void* vpf)
{
// Static function.
// Called within a dedicated thread if XrdOucCacheIO is io-active or the
// file cannot be closed in a clean fashion for some reason.
//
EPNAME("DDestroy");
XrdCl::XRootDStatus Status;
std::string statusMsg;
const char *eTxt;
XrdPosixFile *fCurr, *fNext;
int ddCount;
bool ioActive, doWait = false;
// Wait for active I/O to complete
//
do{if (doWait)
{XrdSysTimer::Snooze(XrdPosixGlobals::ddInterval);
doWait = false;
} else {
ddSem.Wait();
doWait = true;
continue;
}
// Grab the delayed delete list
//
ddMutex.Lock();
fNext=ddList; ddList=0; ddPosted=false; ddCount = ddNum; ddNum = 0;
ddMutex.UnLock();
// Do some debugging
//
DEBUG("DLY destory of "<nextFile;
if (!((ioActive = fCurr->XCio->ioActive())) && !fCurr->Refs())
{if (fCurr->Close(Status)) {delete fCurr; ddCount--; continue;}
else {statusMsg = Status.ToString();
eTxt = statusMsg.c_str();
}
} else eTxt = (ioActive ? "active I/O" : "callback");
if (fCurr->numTries > XrdPosixGlobals::ddMaxTries)
{XrdPosixGlobals::ddNumLost++; ddCount--;
if (XrdPosixGlobals::eDest)
{char buff[256];
snprintf(buff, sizeof(buff), "(%d) %s",
XrdPosixGlobals:: ddNumLost, eTxt);
XrdPosixGlobals::eDest->Emsg("DDestroy",
"timeout closing", fCurr->Origin());
} else {
DMSG("DDestroy", eTxt <<" timeout closing " <Origin()
<<' ' <nextFile = ddLost;
ddLost = fCurr;
fCurr->Close(Status);
} else {
fCurr->numTries++;
doWait = true;
ddMutex.Lock();
fCurr->nextFile = ddList; ddList = fCurr;
ddNum++; ddPosted = true;
ddMutex.UnLock();
}
}
DEBUG("DLY destory end; "<nextFile = ddList;
ddList = fp;
ddNum++; ddCount = ddNum;
if (ddPosted) doPost = false;
else {doPost = true;
ddPosted = true;
}
ddMutex.UnLock();
fp->numTries = 0;
DEBUG("DLY destory "<<(doPost ? "post " : "has ")<Origin());
if (doPost) ddSem.Post();
}
/******************************************************************************/
/* C l o s e */
/******************************************************************************/
bool XrdPosixFile::Close(XrdCl::XRootDStatus &Status)
{
// If this is a defered open, disable any future calls as we are ready to
// shutdown this beast!
//
if (PrepIO) PrepIO->Disable();
// If we don't need to close the file, then return success. Otherwise, do the
// actual close and return the status. We should have already been removed
// from the file table at this point and should be unlocked.
//
if (clFile.IsOpen())
{Status = clFile.Close();
return Status.IsOK();
}
return true;
}
/******************************************************************************/
/* F i n a l i z e */
/******************************************************************************/
bool XrdPosixFile::Finalize(XrdCl::XRootDStatus *Status)
{
XrdOucCacheIO2 *ioP;
// Indicate that we are at the start of the file
//
currOffset = 0;
// Complete initialization. If the stat() fails, the caller will unwind the
// whole open process (ick). In the process get correct I/O vector.
if (!Status) ioP = (XrdOucCacheIO2 *)PrepIO;
else if (Stat(*Status)) ioP = (XrdOucCacheIO2 *)this;
else return false;
// Setup the cache if it is to be used
//
if (XrdPosixGlobals::theCache)
XCio = XrdPosixGlobals::theCache->Attach(ioP, cOpt);
return true;
}
/******************************************************************************/
/* F s t a t */
/******************************************************************************/
int XrdPosixFile::Fstat(struct stat &buf)
{
long long theSize;
// The size is treated differently here as it may come from a cache and may
// actually trigger a file open if the open was deferred.
//
theSize = XCio->FSize();
if (theSize < 0) return static_cast(theSize);
// Return what little we can
//
buf.st_size = theSize;
buf.st_atime = buf.st_mtime = buf.st_ctime = myMtime;
buf.st_blocks = buf.st_size/512+1;
buf.st_ino = myInode;
buf.st_rdev = myRdev;
buf.st_mode = myMode;
return 0;
}
/******************************************************************************/
/* H a n d l e R e s p o n s e */
/******************************************************************************/
void XrdPosixFile::HandleResponse(XrdCl::XRootDStatus *status,
XrdCl::AnyObject *response)
{
XrdCl::XRootDStatus Status;
XrdPosixCallBack *xeqCB = theCB;
int rc = fdNum;
// If no errors occured, complete the open
//
if (!(status->IsOK())) rc = XrdPosixMap::Result(*status);
else if (!Finalize(&Status)) rc = XrdPosixMap::Result(Status);
// Issue callback with the correct result
//
xeqCB->Complete(rc);
// Finish up
//
delete status;
delete response;
if (rc < 0) delete this;
}
/******************************************************************************/
/* L o c a t i o n */
/******************************************************************************/
const char *XrdPosixFile::Location()
{
// If the file is not open, then we have no location
//
if (!clFile.IsOpen()) return 0;
// If we have no location info, get it
//
if (!fLoc)
{std::string currNode;
if (clFile.GetProperty(dsProperty, currNode))
fLoc = strdup(currNode.c_str());
}
// Return location information
//
return fLoc;
}
/******************************************************************************/
/* R e a d */
/******************************************************************************/
int XrdPosixFile::Read (char *Buff, long long Offs, int Len)
{
XrdCl::XRootDStatus Status;
uint32_t bytes;
// Issue read and return appropriately.
//
Ref();
Status = clFile.Read((uint64_t)Offs, (uint32_t)Len, Buff, bytes);
unRef();
return (Status.IsOK() ? (int)bytes : XrdPosixMap::Result(Status));
}
/******************************************************************************/
void XrdPosixFile::Read (XrdOucCacheIOCB &iocb, char *buff, long long offs,
int rlen)
{
XrdCl::XRootDStatus Status;
XrdPosixFileRH *rhp = XrdPosixFileRH::Alloc(&iocb, this, offs, rlen,
XrdPosixFileRH::isRead);
// Issue read
//
Ref();
Status = clFile.Read((uint64_t)offs, (uint32_t)rlen, buff, rhp);
// Check status
//
if (!Status.IsOK())
{rhp->Sched(-XrdPosixMap::Result(Status));
unRef();
}
}
/******************************************************************************/
/* R e a d V */
/******************************************************************************/
int XrdPosixFile::ReadV (const XrdOucIOVec *readV, int n)
{
XrdCl::XRootDStatus Status;
XrdCl::ChunkList chunkVec;
XrdCl::VectorReadInfo *vrInfo = 0;
int nbytes = 0;
// Copy in the vector (would be nice if we didn't need to do this)
//
chunkVec.reserve(n);
for (int i = 0; i < n; i++)
{nbytes += readV[i].size;
chunkVec.push_back(XrdCl::ChunkInfo((uint64_t)readV[i].offset,
(uint32_t)readV[i].size,
(void *)readV[i].data
));
}
// Issue the readv. We immediately delete the vrInfo as w don't need it as a
// readv will succeed only if actually read the number of bytes requested.
//
Ref();
Status = clFile.VectorRead(chunkVec, (void *)0, vrInfo);
unRef();
delete vrInfo;
// Return appropriate result
//
return (Status.IsOK() ? nbytes : XrdPosixMap::Result(Status));
}
/******************************************************************************/
void XrdPosixFile::ReadV(XrdOucCacheIOCB &iocb, const XrdOucIOVec *readV, int n)
{
XrdCl::XRootDStatus Status;
XrdCl::ChunkList chunkVec;
int nbytes = 0;
// Copy in the vector (would be nice if we didn't need to do this)
//
chunkVec.reserve(n);
for (int i = 0; i < n; i++)
{nbytes += readV[i].size;
chunkVec.push_back(XrdCl::ChunkInfo((uint64_t)readV[i].offset,
(uint32_t)readV[i].size,
(void *)readV[i].data
));
}
// Issue the readv.
//
XrdPosixFileRH *rhp = XrdPosixFileRH::Alloc(&iocb, this, 0, nbytes,
XrdPosixFileRH::isReadV);
Ref();
Status = clFile.VectorRead(chunkVec, (void *)0, rhp);
// Return appropriate result
//
if (!Status.IsOK())
{rhp->Sched(-XrdPosixMap::Result(Status));
unRef();
}
}
/******************************************************************************/
/* S t a t */
/******************************************************************************/
bool XrdPosixFile::Stat(XrdCl::XRootDStatus &Status, bool force)
{
XrdCl::StatInfo *sInfo = 0;
// Get the stat information from the open file
//
Ref();
Status = clFile.Stat(force, sInfo);
if (!Status.IsOK())
{unRef();
delete sInfo;
return false;
}
// Copy over the relevant fields
//
myMode = XrdPosixMap::Flags2Mode(&myRdev, sInfo->GetFlags());
myMtime = static_cast(sInfo->GetModTime());
mySize = static_cast(sInfo->GetSize());
myInode = static_cast(strtoll(sInfo->GetId().c_str(), 0, 10));
// Delete our status information and return final result
//
unRef();
delete sInfo;
return true;
}
/******************************************************************************/
/* S y n c */
/******************************************************************************/
int XrdPosixFile::Sync()
{
XrdCl::XRootDStatus Status;
// Issue the Sync
//
Ref();
Status = clFile.Sync();
unRef();
// Return result
//
return XrdPosixMap::Result(Status);
}
/******************************************************************************/
void XrdPosixFile::Sync(XrdOucCacheIOCB &iocb)
{
XrdCl::XRootDStatus Status;
XrdPosixFileRH *rhp = XrdPosixFileRH::Alloc(&iocb, this, 0, 0,
XrdPosixFileRH::nonIO);
// Issue read
//
Status = clFile.Sync(rhp);
// Check status
//
if (!Status.IsOK()) rhp->Sched(-XrdPosixMap::Result(Status));
}
/******************************************************************************/
/* T r u n c */
/******************************************************************************/
int XrdPosixFile::Trunc(long long Offset)
{
XrdCl::XRootDStatus Status;
// Issue truncate request
//
Ref();
Status = clFile.Truncate((uint64_t)Offset);
unRef();
// Return results
//
return XrdPosixMap::Result(Status);
}
/******************************************************************************/
/* W r i t e */
/******************************************************************************/
int XrdPosixFile::Write(char *Buff, long long Offs, int Len)
{
XrdCl::XRootDStatus Status;
// Issue read and return appropriately
//
Ref();
Status = clFile.Write((uint64_t)Offs, (uint32_t)Len, Buff);
unRef();
return (Status.IsOK() ? Len : XrdPosixMap::Result(Status));
}
/******************************************************************************/
void XrdPosixFile::Write(XrdOucCacheIOCB &iocb, char *buff, long long offs,
int wlen)
{
XrdCl::XRootDStatus Status;
XrdPosixFileRH *rhp = XrdPosixFileRH::Alloc(&iocb, this, offs, wlen,
XrdPosixFileRH::isWrite);
// Issue read
//
Ref();
Status = clFile.Write((uint64_t)offs, (uint32_t)wlen, buff, rhp);
// Check status
//
if (!Status.IsOK())
{rhp->Sched(-XrdPosixMap::Result(Status));
unRef();
}
}