/******************************************************************************/
/* */
/* X r d O f s H a n d l e . c c */
/* */
/* (c) 2008 by the Board of Trustees of the Leland Stanford, Jr., University */
/* All Rights Reserved */
/* Produced by Andrew Hanushevsky for Stanford University under contract */
/* DE-AC02-76-SFO0515 with the Department of Energy */
/* */
/* This file is part of the XRootD software suite. */
/* */
/* XRootD is free software: you can redistribute it and/or modify it under */
/* the terms of the GNU Lesser General Public License as published by the */
/* Free Software Foundation, either version 3 of the License, or (at your */
/* option) any later version. */
/* */
/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
/* License for more details. */
/* */
/* You should have received a copy of the GNU Lesser General Public License */
/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
/* COPYING (GPL license). If not, see . */
/* */
/* The copyright holder's institutional names and contributor's names may not */
/* be used to endorse or promote products derived from this software without */
/* specific prior written permission of the institution or contributor. */
/******************************************************************************/
#include
#include
#include
#include
#include "XrdOfs/XrdOfsHandle.hh"
#include "XrdOfs/XrdOfsStats.hh"
#include "XrdOss/XrdOss.hh"
#include "XrdSys/XrdSysError.hh"
#include "XrdSys/XrdSysPlatform.hh"
#include "XrdSys/XrdSysTimer.hh"
/******************************************************************************/
/* L o c a l C l a s s e s */
/******************************************************************************/
/******************************************************************************/
/* X r d O f s H a n O s s */
/******************************************************************************/
class XrdOfsHanOss : public XrdOssDF
{
public:
// Directory oriented methods
int Opendir(const char *, XrdOucEnv &) {return -EBADF;}
int Readdir(char *buff, int blen) {return -EBADF;}
// File oriented methods
int Fstat(struct stat *) {return -EBADF;}
int Fsync() {return -EBADF;}
int Fsync(XrdSfsAio *aiop) {return -EBADF;}
int Ftruncate(unsigned long long) {return -EBADF;}
off_t getMmap(void **addr) {return 0;}
int isCompressed(char *cxidp=0) {return 0;}
int Open(const char *, int, mode_t, XrdOucEnv &) {return -EBADF;}
ssize_t Read(off_t, size_t) {return (ssize_t)-EBADF;}
ssize_t Read(void *, off_t, size_t) {return (ssize_t)-EBADF;}
int Read(XrdSfsAio *aoip) {return (ssize_t)-EBADF;}
ssize_t ReadRaw( void *, off_t, size_t) {return (ssize_t)-EBADF;}
ssize_t Write(const void *, off_t, size_t) {return (ssize_t)-EBADF;}
int Write(XrdSfsAio *aiop) {return (ssize_t)-EBADF;}
// Methods common to both
int Close(long long *retsz=0) {return -EBADF;}
inline int Handle() {return -1;}
XrdOfsHanOss() {}
~XrdOfsHanOss() {}
};
/******************************************************************************/
/* X r d O f s H a n X p r */
/******************************************************************************/
class XrdOfsHanXpr
{
friend class XrdOfsHandle;
public:
void add2Q(int doLK=1);
void Deref()
{xqCV.Lock(); Handle=0; Call=0; xTNew=0; xqCV.UnLock();}
static XrdOfsHanXpr *Get();
void Set(XrdOfsHanCB *cbP, time_t xtm)
{xqCV.Lock(); Call = cbP; xTNew = xtm; xqCV.UnLock();}
XrdOfsHanXpr(XrdOfsHandle *hP, XrdOfsHanCB *cbP, time_t xtm)
: Next(0), Handle(hP), Call(cbP), xTime(xtm), xTNew(0) {}
~XrdOfsHanXpr() {}
private:
XrdOfsHanXpr *Next;
XrdOfsHandle *Handle;
XrdOfsHanCB *Call;
time_t xTime;
time_t xTNew;
static XrdSysCondVar xqCV;
static XrdOfsHanXpr *xprQ;
};
XrdSysCondVar XrdOfsHanXpr::xqCV(0, "HanXpr cv");
XrdOfsHanXpr *XrdOfsHanXpr::xprQ = 0;
/******************************************************************************/
/* X r d O f s H a n P s c */
/******************************************************************************/
class XrdOfsHanPsc
{
public:
union {
XrdOfsHanPsc *Next;
char *User; // -> Owner for posc files (user.pid:fd@host)
};
XrdOfsHanXpr *xprP; // -> Associate Xpr object if active
int Unum; // -> Offset in poscq
short Ulen; // Length of user.pid
short Uhst; // -> Host portion
short Mode; // Mode file is to have
static
XrdOfsHanPsc *Alloc();
void Recycle();
XrdOfsHanPsc() : User(0), xprP(0), Unum(0), Ulen(0),
Uhst(0), Mode(0) {}
~XrdOfsHanPsc() {}
private:
static XrdSysMutex pscMutex;
static XrdOfsHanPsc *Free;
};
XrdSysMutex XrdOfsHanPsc::pscMutex;
XrdOfsHanPsc *XrdOfsHanPsc::Free = 0;
/******************************************************************************/
/* E x t e r n a l L i n k a g e s */
/******************************************************************************/
void *XrdOfsHanXpire(void *pp)
{
XrdOfsHandle::StartXpr();
return (void *)0;
}
extern XrdSysError OfsEroute;
extern XrdOfsStats OfsStats;
/******************************************************************************/
/* S t a t i c O b j e c t s */
/******************************************************************************/
XrdSysMutex XrdOfsHandle::myMutex;
XrdOfsHanTab XrdOfsHandle::roTable;
XrdOfsHanTab XrdOfsHandle::rwTable;
XrdOssDF *XrdOfsHandle::ossDF = (XrdOssDF *)new XrdOfsHanOss;
XrdOfsHandle *XrdOfsHandle::Free = 0;
/******************************************************************************/
/* c l a s s X r d O f s H a n d l e */
/******************************************************************************/
/******************************************************************************/
/* static public A l l o c # 1 */
/******************************************************************************/
int XrdOfsHandle::Alloc(const char *thePath, int Opts, XrdOfsHandle **Handle)
{
XrdOfsHandle *hP;
XrdOfsHanTab *theTable = (Opts & opRW ? &rwTable : &roTable);
XrdOfsHanKey theKey(thePath, (int)strlen(thePath));
int retc;
// Lock the search table and try to find the key. If found, increment the
// the link count (can only be done with the global lock) then release the
// lock and try to lock the handle. It can't escape between lock calls because
// the link count is positive. If we can't lock the handle then it must be the
// that a long running operation is occuring. Return the handle to its former
// state and return a delay. Otherwise, return the handle.
//
myMutex.Lock();
if ((hP = theTable->Find(theKey)))
{hP->Path.Links++; myMutex.UnLock();
if (hP->WaitLock()) {*Handle = hP; return 0;}
myMutex.Lock(); hP->Path.Links--; myMutex.UnLock();
return nolokDelay;
}
// Get a new handle
//
if (!(retc = Alloc(theKey, Opts, Handle))) theTable->Add(*Handle);
OfsStats.Add(OfsStats.Data.numHandles);
// All done
//
myMutex.UnLock();
return retc;
}
/******************************************************************************/
/* static public A l l o c # 2 */
/******************************************************************************/
int XrdOfsHandle::Alloc(XrdOfsHandle **Handle)
{
XrdOfsHanKey myKey("dummy", 5);
int retc;
myMutex.Lock();
if (!(retc = Alloc(myKey, 0, Handle)))
{(*Handle)->Path.Links = 0; (*Handle)->UnLock();}
myMutex.UnLock();
return retc;
}
/******************************************************************************/
/* private A l l o c # 3 */
/******************************************************************************/
int XrdOfsHandle::Alloc(XrdOfsHanKey theKey, int Opts, XrdOfsHandle **Handle)
{
static const int minAlloc = 4096/sizeof(XrdOfsHandle);
XrdOfsHandle *hP;
// No handle currently in the table. Get a new one off the free list
//
if (!Free && (hP = new XrdOfsHandle[minAlloc]))
{int i = minAlloc; while(i--) {hP->Next = Free; Free = hP; hP++;}}
if ((hP = Free)) Free = hP->Next;
// Initialize the new handle, if we have one, and add it to the table
//
if (hP)
{hP->Path = theKey;
hP->Path.Links = 1;
hP->isChanged = 0; // File changed
hP->isCompressed = 0; // Compression
hP->isPending = 0; // Pending output
hP->isRW = (Opts & opPC); // File mode
hP->ssi = ossDF; // No storage system yet
hP->Posc = 0; // No creator
hP->Lock(); // Wait is not possible
*Handle = hP;
return 0;
}
return nomemDelay; // Delay client
}
/******************************************************************************/
/* static public H i d e */
/******************************************************************************/
void XrdOfsHandle::Hide(const char *thePath)
{
XrdOfsHandle *hP;
XrdOfsHanKey theKey(thePath, (int)strlen(thePath));
// Lock the search table and try to find the key in each table. If found,
// clear the length field to effectively hide the item.
//
myMutex.Lock();
if ((hP = roTable.Find(theKey))) hP->Path.Len = 0;
if ((hP = rwTable.Find(theKey))) hP->Path.Len = 0;
myMutex.UnLock();
}
/******************************************************************************/
/* public P o s c G e t */
/******************************************************************************/
// Warning: the handle must be locked!
int XrdOfsHandle::PoscGet(short &Mode, int Done)
{
XrdOfsHanPsc *pP;
int pnum;
if (Posc)
{pnum = Posc->Unum;
Mode = Posc->Mode;
if (Done)
{pP = Posc; Posc = 0;
if (pP->xprP) {myMutex.Lock(); Path.Links--; myMutex.UnLock();}
pP->Recycle();
}
return pnum;
}
Mode = 0;
return 0;
}
/******************************************************************************/
/* public P o s c S e t */
/******************************************************************************/
// Warning: the handle must be locked!
int XrdOfsHandle::PoscSet(const char *User, int Unum, short Umod)
{
static const char *Who = "?:0.0@?", *Whc = Who+1, *Whh = Who+5;
const char *Col, *At;
int retval = 0;
// If we have no posc object then we may just be able to return
//
if (!Posc)
{if (Unum > 0) Posc = XrdOfsHanPsc::Alloc();
else return 0;
}
// Find the markers in the incomming user
//
if (!(Col = index(User, ':')) || !(At = index(User, '@')))
{User = Who; Col = Whc; At = Whh;}
// If we already have a user check if it matches
//
if (Posc->User)
{if (!Unum)
{if (!strncmp(User, Posc->User, Posc->Ulen)
&& !strcmp(Posc->User + Posc->Uhst, At+1)) return 0;
return -ETXTBSY;
} else {
char buff[1024];
sprintf(buff, "%s to %s for", Posc->User, User);
OfsEroute.Emsg("Posc", "Creator changed from", buff, Path.Val);
if (Unum < 0) Unum = Posc->Unum;
else if (Unum != Posc->Unum) retval = Posc->Unum;
}
free(Posc->User);
}
// Assign creation values
//
Posc->User = strdup(User);
Posc->Ulen = Col - User + 1;
Posc->Uhst = At - User + 1;
Posc->Unum = Unum;
Posc->Mode = Umod;
return retval;
}
/******************************************************************************/
/* public P o s c U s r */
/******************************************************************************/
// Warning: the handle must be locked!
const char *XrdOfsHandle::PoscUsr()
{
if (Posc) return Posc->User;
return "?@?";
}
/******************************************************************************/
/* public R e t i r e */
/******************************************************************************/
// The handle must be locked upon entry! It is unlocked upon exit.
int XrdOfsHandle::Retire(int &retc, long long *retsz, char *buff, int blen)
{
XrdOssDF *mySSI;
int numLeft;
// Get the global lock as the links field can only be manipulated with it.
// Decrement the links count and if zero, remove it from the table and
// place it on the free list. Otherwise, it is still in use.
//
retc = 0;
myMutex.Lock();
if (Path.Links == 1)
{if (buff) strlcpy(buff, Path.Val, blen);
numLeft = 0; OfsStats.Dec(OfsStats.Data.numHandles);
if ( (isRW ? rwTable.Remove(this) : roTable.Remove(this)) )
{if (Posc) {Posc->Recycle(); Posc = 0;}
if (Path.Val) {free((void *)Path.Val); Path.Val = (char *)"";}
Path.Len = 0; mySSI = ssi; ssi = ossDF;
Next = Free; Free = this; UnLock(); myMutex.UnLock();
if (mySSI && mySSI != ossDF)
{retc = mySSI->Close(retsz); delete mySSI;}
} else {
UnLock(); myMutex.UnLock();
OfsEroute.Emsg("Retire", "Lost handle to", buff);
}
} else {numLeft = --Path.Links; UnLock(); myMutex.UnLock();}
return numLeft;
}
/******************************************************************************/
int XrdOfsHandle::Retire(XrdOfsHanCB *cbP, int hTime)
{
static int allOK = StartXpr(1);
XrdOfsHanXpr *xP;
int retc;
// The handle can only be held by one reference and only if it's a POSC and
// defered handling was properly set up.
//
myMutex.Lock();
if (!Posc || !allOK)
{OfsEroute.Emsg("Retire", "ignoring deferred retire of", Path.Val);
if (Path.Links != 1 || !Posc || !cbP) myMutex.UnLock();
else {myMutex.UnLock(); cbP->Retired(this);}
return Retire(retc);
}
myMutex.UnLock();
// If this object already has an xpr object (happens for bouncing connections)
// then reuse that object. Otherwise create a new one and put it on the queue.
//
if (Posc->xprP) Posc->xprP->Set(cbP, hTime+time(0));
else {xP = Posc->xprP = new XrdOfsHanXpr(this, cbP, hTime+time(0));
xP->add2Q();
}
UnLock();
return 0;
}
/******************************************************************************/
/* public S t a r t X p r */
/******************************************************************************/
int XrdOfsHandle::StartXpr(int Init)
{
static int InitDone = 0;
XrdOfsHanXpr *xP;
XrdOfsHandle *hP;
int retc;
// If this is the initial all and we have not been initialized do so
//
if (Init)
{pthread_t tid;
int rc;
if (InitDone) return InitDone == 1;
if ((rc = XrdSysThread::Run(&tid, XrdOfsHanXpire, (void *)0,
0, "Handle Timeout")))
{OfsEroute.Emsg("StartXpr", rc, "create handle timeout thread");
InitDone = -1; return 0;
}
InitDone = 1; return 1;
}
// Simply loop waiting for expired handles to become available. The Get() will
// return an Xpr object with the associated handle locked.
//
do{xP = XrdOfsHanXpr::Get(); hP = xP->Handle;
// Perform validity check on the handle to catch instances where the handle
// was closed while we were in the process of getting it. While this is safe
// it should never happen, so issue a message so we know to fix it.
//
if (hP->Posc && xP == hP->Posc->xprP) hP->Posc->xprP = 0;
else {OfsEroute.Emsg("StarXtpr", "Invalid xpr ref to", hP->Path.Val);
hP->UnLock(); delete xP; continue;
}
// As the handle is locked we can get the global handle lock to prevent
// additions and removals of handles as we need a stable reference count to
// effect the callout, if any. Do so only if the reference count is one (for us)
// and the handle is active. In all cases, drop the global lock.
//
myMutex.Lock();
if (hP->Path.Links != 1 || !xP->Call) myMutex.UnLock();
else {myMutex.UnLock();
xP->Call->Retired(hP);
}
// We can now officially retire the handle and delete the xpr object
//
hP->Retire(retc);
delete xP;
} while(1);
// Keep the compiler happy
//
return 0;
}
/******************************************************************************/
/* public W a i t L o c k */
/******************************************************************************/
int XrdOfsHandle::WaitLock(void)
{
// Try to obtain a lock within the retry parameters
//
if (hMutex.TimedLock(LockTries*LockWait)) return 1;
return 0;
}
/******************************************************************************/
/* C l a s s X r d O f s H a n P s c */
/******************************************************************************/
/******************************************************************************/
/* A l l o c */
/******************************************************************************/
XrdOfsHanPsc *XrdOfsHanPsc::Alloc()
{
XrdOfsHanPsc *pP;
// Grab or allocate an object
//
pscMutex.Lock();
if ((pP = Free)) {Free = pP->Next; pP->Next = 0;}
else pP = new XrdOfsHanPsc;
pscMutex.UnLock();
return pP;
}
/******************************************************************************/
/* R e c y c l e */
/******************************************************************************/
void XrdOfsHanPsc::Recycle()
{
// Release any storgae appendages and clear other field
//
if (xprP) {xprP->Deref(); xprP = 0;}
if (User) free(User);
Unum = 0;
Ulen = 0;
Uhst = 0;
Mode = 0;
// Place element on free chain. We keep them all as there are never too many
//
pscMutex.Lock();
Next = Free; Free = this;
pscMutex.UnLock();
}
/******************************************************************************/
/* C l a s s X r d O f s H a n T a b */
/******************************************************************************/
/******************************************************************************/
/* C o n s t r u c t o r */
/******************************************************************************/
XrdOfsHanTab::XrdOfsHanTab(int psize, int csize)
{
prevtablesize = psize;
nashtablesize = csize;
Threshold = (csize * LoadMax) / 100;
nashnum = 0;
nashtable = (XrdOfsHandle **)
malloc( (size_t)(csize*sizeof(XrdOfsHandle *)) );
memset((void *)nashtable, 0, (size_t)(csize*sizeof(XrdOfsHandle *)));
}
/******************************************************************************/
/* public A d d */
/******************************************************************************/
void XrdOfsHanTab::Add(XrdOfsHandle *hip)
{
unsigned int kent;
// Check if we should expand the table
//
if (++nashnum > Threshold) Expand();
// Add the entry to the table
//
kent = hip->Path.Hash % nashtablesize;
hip->Next = nashtable[kent];
nashtable[kent] = hip;
}
/******************************************************************************/
/* private E x p a n d */
/******************************************************************************/
void XrdOfsHanTab::Expand()
{
int newsize, newent, i;
size_t memlen;
XrdOfsHandle **newtab, *nip, *nextnip;
// Compute new size for table using a fibonacci series
//
newsize = prevtablesize + nashtablesize;
// Allocate the new table
//
memlen = (size_t)(newsize*sizeof(XrdOfsHandle *));
if (!(newtab = (XrdOfsHandle **) malloc(memlen))) return;
memset((void *)newtab, 0, memlen);
// Redistribute all of the current items
//
for (i = 0; i < nashtablesize; i++)
{nip = nashtable[i];
while(nip)
{nextnip = nip->Next;
newent = nip->Path.Hash % newsize;
nip->Next = newtab[newent];
newtab[newent] = nip;
nip = nextnip;
}
}
// Free the old table and plug in the new table
//
free((void *)nashtable);
nashtable = newtab;
prevtablesize = nashtablesize;
nashtablesize = newsize;
// Compute new expansion threshold
//
Threshold = static_cast((static_cast(newsize)*LoadMax)/100);
}
/******************************************************************************/
/* public F i n d */
/******************************************************************************/
XrdOfsHandle *XrdOfsHanTab::Find(XrdOfsHanKey &Key)
{
XrdOfsHandle *nip;
unsigned int kent;
// Compute position of the hash table entry
//
kent = Key.Hash%nashtablesize;
// Find the entry
//
nip = nashtable[kent];
while(nip && nip->Path != Key) nip = nip->Next;
return nip;
}
/******************************************************************************/
/* public R e m o v e */
/******************************************************************************/
int XrdOfsHanTab::Remove(XrdOfsHandle *rip)
{
XrdOfsHandle *nip, *pip = 0;
unsigned int kent;
// Compute position of the hash table entry
//
kent = rip->Path.Hash%nashtablesize;
// Find the entry
//
nip = nashtable[kent];
while(nip && nip != rip) {pip = nip; nip = nip->Next;}
// Remove if found
//
if (nip)
{if (pip) pip->Next = nip->Next;
else nashtable[kent] = nip->Next;
nashnum--;
}
return nip != 0;
}
/******************************************************************************/
/* C l a s s X r d O f s H a n x p r */
/******************************************************************************/
/******************************************************************************/
/* a d d 2 Q */
/******************************************************************************/
void XrdOfsHanXpr::add2Q(int doLK)
{
XrdOfsHanXpr *xPP, *xP;
// Place this object on the defered queue
//
if (doLK) xqCV.Lock();
xPP = 0; xP = xprQ;
while(xP && xP->xTime < xTime) {xPP = xP; xP = xP->Next;}
Next = xP;
if (xPP) {xPP->Next = this; if (doLK) xqCV.UnLock();}
else { xprQ = this; if (doLK) {xqCV.Signal(); xqCV.UnLock();}}
};
/******************************************************************************/
/* public G e t */
/******************************************************************************/
XrdOfsHanXpr *XrdOfsHanXpr::Get()
{
XrdOfsHanXpr *xP;
XrdOfsHandle *hP;
int waitTime = 2592000;
// Obtain the xqCV lock as we need it to inspect/modify the queue and elements
// This lock is automatically released when we wait on the associated condvar.
//
xqCV.Lock();
// Caculate the next wait time based on the first element, if any, in the queue.
// If the wait time is positive then loop back to wait that amount of time. Note
// that we have the xqCV lock that is needed to touch an inq Xpr object.
//
do{do{if (!(xP = xprQ)) waitTime = 2592000;
else waitTime = xP->xTime - time(0);
if (waitTime > 0) break;
xprQ = xP->Next;
// Get the associated file handle. If none, simply delete the Xpr object.
//
if (!(hP = xP->Handle)) {delete xP; continue;}
// If a new wait time is indicated then reschedule this object
//
if (xP->xTNew)
{xP->xTime = xP->xTNew; xP->xTNew = 0;
xP->add2Q(0);
continue;
}
// Since we are still holding the xqCV lock we must get a conditional lock on
// the handle. If we can't then reschedule this object for later.
//
if (!(hP->WaitLock()))
{OfsEroute.Emsg("Retire", "defering retire of", hP->Path.Val);
xP->xTime = time(0)+30;
xP->add2Q(0);
continue;
}
// Drop the xqCV lock prior to returning the Xpr object to the caller. The
// caller will delete the object as needed.
//
xqCV.UnLock();
return xP;
} while(1);
// We have the xqCV lock so we can now wait for an event or a timeout
//
xqCV.Wait(waitTime);
} while(1);
}