/******************************************************************************/
/* */
/* X r d O f s P o s c q . c c */
/* */
/* (c) 2009 by the Board of Trustees of the Leland Stanford, Jr., University */
/* All Rights Reserved */
/* Produced by Andrew Hanushevsky for Stanford University under contract */
/* DE-AC02-76-SFO0515 with the Department of Energy */
/* */
/* This file is part of the XRootD software suite. */
/* */
/* XRootD is free software: you can redistribute it and/or modify it under */
/* the terms of the GNU Lesser General Public License as published by the */
/* Free Software Foundation, either version 3 of the License, or (at your */
/* option) any later version. */
/* */
/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
/* License for more details. */
/* */
/* You should have received a copy of the GNU Lesser General Public License */
/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
/* COPYING (GPL license). If not, see . */
/* */
/* The copyright holder's institutional names and contributor's names may not */
/* be used to endorse or promote products derived from this software without */
/* specific prior written permission of the institution or contributor. */
/******************************************************************************/
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include "XrdOfs/XrdOfsPoscq.hh"
#include "XrdOss/XrdOss.hh"
#include "XrdSfs/XrdSfsFlags.hh"
#include "XrdSys/XrdSysError.hh"
#include "XrdSys/XrdSysFD.hh"
#include "XrdSys/XrdSysPlatform.hh"
/******************************************************************************/
/* C o n s t r u c t o r */
/******************************************************************************/
XrdOfsPoscq::XrdOfsPoscq(XrdSysError *erp, XrdOss *oss, const char *fn)
{
eDest = erp;
ossFS = oss;
pocFN = strdup(fn);
pocFD = -1;
pocSZ = 0;
pocIQ = 0;
SlotList = SlotLust = 0;
}
/******************************************************************************/
/* A d d */
/******************************************************************************/
int XrdOfsPoscq::Add(const char *Tident, const char *Lfn)
{
XrdOfsPoscq::Request tmpReq;
FileSlot *freeSlot;
int fP;
// Construct the request
//
tmpReq.addT = 0;
strlcpy(tmpReq.LFN, Lfn, sizeof(tmpReq.LFN));
strlcpy(tmpReq.User, Tident, sizeof(tmpReq.User));
memset(tmpReq.Reserved, 0, sizeof(tmpReq.Reserved));
// Obtain a free slot
//
myMutex.Lock();
if ((freeSlot = SlotList))
{fP = freeSlot->Offset;
SlotList = freeSlot->Next;
freeSlot->Next = SlotLust;
SlotLust = freeSlot;
} else {fP = pocSZ; pocSZ += ReqSize;}
pocIQ++;
myMutex.UnLock();
// Write out the record
//
if (!reqWrite((void *)&tmpReq, sizeof(tmpReq), fP))
{eDest->Emsg("Add", Lfn, "not added to the persist queue.");
myMutex.Lock(); pocIQ--; myMutex.UnLock();
return -EIO;
}
// Return the record offset
//
return fP;
}
/******************************************************************************/
/* C o m m i t */
/******************************************************************************/
int XrdOfsPoscq::Commit(const char *Lfn, int Offset)
{
long long addT = static_cast(time(0));
// Verify the offset it must be correct
//
if (!VerOffset(Lfn, Offset)) return -EINVAL;
// Indicate the record is free
//
if (reqWrite((void *)&addT, sizeof(addT), Offset)) return 0;
eDest->Emsg("Commit", Lfn, "not commited to the persist queue.");
return -EIO;
}
/******************************************************************************/
/* D e l */
/******************************************************************************/
int XrdOfsPoscq::Del(const char *Lfn, int Offset, int Unlink)
{
static int Zero = 0;
FileSlot *freeSlot;
int retc;
// Verify the offset it must be correct
//
if (!VerOffset(Lfn, Offset)) return -EINVAL;
// Unlink the file if need be
//
if (Unlink && (retc = ossFS->Unlink(Lfn)) && retc != -ENOENT)
{eDest->Emsg("Del", retc, "remove", Lfn);
return (retc < 0 ? retc : -retc);
}
// Indicate the record is free
//
if (!reqWrite((void *)&Zero, sizeof(Zero), Offset+offsetof(Request,LFN)))
{eDest->Emsg("Del", Lfn, "not removed from the persist queue.");
return -EIO;
}
// Serialize and place this on the free queue
//
myMutex.Lock();
if ((freeSlot = SlotLust)) SlotLust = freeSlot->Next;
else freeSlot = new FileSlot;
freeSlot->Offset = Offset;
freeSlot->Next = SlotList;
SlotList = freeSlot;
if (pocIQ > 0) pocIQ--;
myMutex.UnLock();
// All done
//
return 0;
}
/******************************************************************************/
/* I n i t */
/******************************************************************************/
XrdOfsPoscq::recEnt *XrdOfsPoscq::Init(int &Ok)
{
static const int Mode = S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH;
Request tmpReq;
struct stat buf, Stat;
recEnt *First = 0;
char Buff[80];
int rc, Offs, numreq = 0;
// Assume we will fail
//
Ok = 0;
// Open the file first in r/w mode
//
if ((pocFD = XrdSysFD_Open(pocFN, O_RDWR|O_CREAT, Mode)) < 0)
{eDest->Emsg("Init",errno,"open",pocFN);
return 0;
}
// Get file status
//
if (fstat(pocFD, &buf)) {FailIni("stat"); return 0;}
// Check for a new file here
//
if (buf.st_size < ReqSize)
{pocSZ = ReqOffs;
if (ftruncate(pocFD, ReqOffs)) FailIni("trunc");
else Ok = 1;
return 0;
}
// Read the full file
//
for (Offs = ReqOffs; Offs < buf.st_size; Offs += ReqSize)
{do {rc = pread(pocFD, (void *)&tmpReq, ReqSize, Offs);}
while(rc < 0 && errno == EINTR);
if (rc < 0) {eDest->Emsg("Init",errno,"read",pocFN); return First;}
if (*tmpReq.LFN == '\0'
|| ossFS->Stat(tmpReq.LFN, &Stat)
|| !(S_ISREG(Stat.st_mode) || !(Stat.st_mode & XRDSFS_POSCPEND))) continue;
First = new recEnt(tmpReq, Stat.st_mode & S_IAMB, First); numreq++;
}
// Now write out the file and return
//
sprintf(Buff, " %d pending create%s", numreq, (numreq != 1 ? "s" : ""));
eDest->Say("Init", Buff, " recovered from ", pocFN);
if (ReWrite(First)) Ok = 1;
return First;
}
/******************************************************************************/
/* L i s t */
/******************************************************************************/
XrdOfsPoscq::recEnt *XrdOfsPoscq::List(XrdSysError *Say, const char *theFN)
{
XrdOfsPoscq::Request tmpReq;
struct stat buf;
recEnt *First = 0;
int rc, theFD, Offs;
// Open the file first in r/o mode
//
if ((theFD = XrdSysFD_Open(theFN, O_RDONLY)) < 0)
{Say->Emsg("Init",errno,"open",theFN);
return 0;
}
// Get file status
//
if (fstat(theFD, &buf))
{Say->Emsg("Init",errno,"stat",theFN);
close(theFD);
return 0;
}
if (buf.st_size < ReqSize) buf.st_size = 0;
// Read the full file
//
for (Offs = ReqOffs; Offs < buf.st_size; Offs += ReqSize)
{do {rc = pread(theFD, (void *)&tmpReq, ReqSize, Offs);}
while(rc < 0 && errno == EINTR);
if (rc < 0) {Say->Emsg("List",errno,"read",theFN);
close(theFD); return First;
}
if (*tmpReq.LFN != '\0') First = new recEnt(tmpReq, 0, First);
}
// All done
//
close(theFD);
return First;
}
/******************************************************************************/
/* F a i l I n i */
/******************************************************************************/
void XrdOfsPoscq::FailIni(const char *txt)
{
eDest->Emsg("Init", errno, txt, pocFN);
}
/******************************************************************************/
/* r e q W r i t e */
/******************************************************************************/
int XrdOfsPoscq::reqWrite(void *Buff, int Bsz, int Offs)
{
int rc = 0;
do {rc = pwrite(pocFD, Buff, Bsz, Offs);} while(rc < 0 && errno == EINTR);
if (rc >= 0 && Bsz > 8) rc = fsync(pocFD);
if (rc < 0) {eDest->Emsg("reqWrite",errno,"write", pocFN); return 0;}
return 1;
}
/******************************************************************************/
/* R e W r i t e */
/******************************************************************************/
int XrdOfsPoscq::ReWrite(XrdOfsPoscq::recEnt *rP)
{
static const int Mode = S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH;
char newFN[MAXPATHLEN], *oldFN;
int newFD, oldFD, Offs = ReqOffs, aOK = 1;
// Construct new file and open it
//
strcpy(newFN, pocFN); strcat(newFN, ".new");
if ((newFD = XrdSysFD_Open(newFN, O_RDWR|O_CREAT|O_TRUNC, Mode)) < 0)
{eDest->Emsg("ReWrite",errno,"open",newFN); return 0;}
// Setup to write/swap the file
//
oldFD = pocFD; pocFD = newFD;
oldFN = pocFN; pocFN = newFN;
// Rewrite all records if we have any
//
while(rP)
{rP->Offset = Offs;
if (!reqWrite((void *)&rP->reqData, ReqSize, Offs)) {aOK = 0; break;}
Offs += ReqSize;
rP = rP->Next;
}
// If all went well, rename the file
//
if (aOK && rename(newFN, oldFN) < 0)
{eDest->Emsg("ReWrite",errno,"rename",newFN); aOK = 0;}
// Perform post processing
//
if (aOK) close(oldFD);
else {close(newFD); pocFD = oldFD;}
pocFN = oldFN;
pocSZ = Offs;
return aOK;
}
/******************************************************************************/
/* V e r O f f s e t */
/******************************************************************************/
int XrdOfsPoscq::VerOffset(const char *Lfn, int Offset)
{
// Verify the offset
//
if (Offset < ReqOffs || (Offset-ReqOffs)%ReqSize)
{char buff[128];
sprintf(buff, "Invalid slot %d for", Offset);
eDest->Emsg("VerOffset", buff, Lfn);
return 0;
}
return 1;
}