/******************************************************************************/
/* */
/* X r d F r m T r a n s f e r . c c */
/* */
/* (c) 2010 by the Board of Trustees of the Leland Stanford, Jr., University */
/* All Rights Reserved */
/* Produced by Andrew Hanushevsky for Stanford University under contract */
/* DE-AC02-76-SFO0515 with the Department of Energy */
/* */
/* This file is part of the XRootD software suite. */
/* */
/* XRootD is free software: you can redistribute it and/or modify it under */
/* the terms of the GNU Lesser General Public License as published by the */
/* Free Software Foundation, either version 3 of the License, or (at your */
/* option) any later version. */
/* */
/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
/* License for more details. */
/* */
/* You should have received a copy of the GNU Lesser General Public License */
/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
/* COPYING (GPL license). If not, see . */
/* */
/* The copyright holder's institutional names and contributor's names may not */
/* be used to endorse or promote products derived from this software without */
/* specific prior written permission of the institution or contributor. */
/******************************************************************************/
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include "XrdFrc/XrdFrcCID.hh"
#include "XrdFrc/XrdFrcRequest.hh"
#include "XrdFrc/XrdFrcTrace.hh"
#include "XrdFrc/XrdFrcXAttr.hh"
#include "XrdFrm/XrdFrmCns.hh"
#include "XrdFrm/XrdFrmConfig.hh"
#include "XrdFrm/XrdFrmMonitor.hh"
#include "XrdFrm/XrdFrmTransfer.hh"
#include "XrdFrm/XrdFrmXfrJob.hh"
#include "XrdFrm/XrdFrmXfrQueue.hh"
#include "XrdNet/XrdNetCmsNotify.hh"
#include "XrdOss/XrdOss.hh"
#include "XrdOuc/XrdOucEnv.hh"
#include "XrdOuc/XrdOucMsubs.hh"
#include "XrdOuc/XrdOucProg.hh"
#include "XrdOuc/XrdOucSxeq.hh"
#include "XrdOuc/XrdOucUtils.hh"
#include "XrdOuc/XrdOucXAttr.hh"
#include "XrdSys/XrdSysError.hh"
#include "XrdSys/XrdSysFD.hh"
#include "XrdSys/XrdSysPlatform.hh"
using namespace XrdFrc;
using namespace XrdFrm;
/******************************************************************************/
/* L o c a l C l a s s e s */
/******************************************************************************/
struct XrdFrmTranArg
{
XrdOucEnv *theEnv;
XrdOucProg *theCmd;
XrdOucMsubs *theVec;
char *theSrc;
char *theDst;
char *theINS;
char theMDP[8];
XrdFrmTranArg(XrdOucEnv *Env)
: theEnv(Env), theCmd(0), theVec(0), theSrc(0),
theDst(0), theINS(0)
{theMDP[0] = '0'; theMDP[1] = 0;}
~XrdFrmTranArg() {}
};
struct XrdFrmTranChk
{ struct stat *Stat;
int lkfd;
int lkfx;
XrdFrmTranChk(struct stat *sP) : Stat(sP), lkfd(-1), lkfx(0) {}
~XrdFrmTranChk() {if (lkfd >= 0) close(lkfd);}
};
/******************************************************************************/
/* S t a t i c s */
/******************************************************************************/
XrdSysMutex XrdFrmTransfer::pMutex;
XrdOucHash XrdFrmTransfer::pTab;
/******************************************************************************/
/* C o n s t r u c t o r */
/******************************************************************************/
XrdFrmTransfer::XrdFrmTransfer()
{
int i;
// Construct program objects
//
for (i = 0; i < 4; i++)
xfrCmd[i] = (Config.xfrCmd[i].theVec ? new XrdOucProg(&Say) : 0);
}
/******************************************************************************/
/* Public: c h e c k F F */
/******************************************************************************/
const char *XrdFrmTransfer::checkFF(const char *Path)
{
EPNAME("checkFF");
struct stat buf;
// Check for a fail file
//
if (!stat(Path, &buf))
{if (buf.st_ctime+Config.FailHold >= time(0))
return "request previously failed";
if (Config.Test) {DEBUG("would have removed '" <reqData.Opaque?xfrP->reqData.LFN+xfrP->reqData.Opaque:0);
XrdFrmTranArg cmdArg(&myEnv);
struct stat pfnStat;
time_t xfrET;
const char *eTxt, *retMsg = 0;
char lfnpath[MAXPATHLEN+1024+512+8], *Lfn, Rfn[MAXPATHLEN+256], *theSrc;
char pdBuff[1024];
int iXfr, pdSZ, lfnEnd, rc, isURL = 0, doRM = 0;
long long fSize = 0;
// The remote source is either the url-lfn or a translated lfn
//
if ((isURL = xfrP->reqData.LFO)) theSrc = xfrP->reqData.LFN;
else {if (!Config.RemotePath(xfrP->reqData.LFN, Rfn, sizeof(Rfn)))
return "lfn2rfn failed";
theSrc = Rfn;
isURL = (*Rfn != '/');
}
// Check if we can actually handle this transfer
//
if (isURL)
{if (xfrCmd[2]) iXfr = 2;
else return "url copies not configured";
} else {
if (xfrCmd[0]) iXfr = 0;
else return "non-url copies not configured";
}
// Check for a fail file
//
if ((eTxt = ffCheck())) return eTxt;
// Check if the file exists
//
Lfn = (xfrP->reqData.LFN)+xfrP->reqData.LFO;
if (!Config.Stat(Lfn, xfrP->PFN, &pfnStat))
{DEBUG(xfrP->PFN <<" exists; not fetched.");
return 0;
}
// Construct the file name to which to we originally transfer the data. This is
// the lfn if we do not pre-allocate files and "lfn.anew" otherwise.
//
lfnEnd = strlen(Lfn);
strlcpy(lfnpath, Lfn, sizeof(lfnpath)-8);
if (Config.xfrCmd[iXfr].Opts & Config.cmdAlloc)
{strcpy(&lfnpath[lfnEnd], ".anew");
strcpy(&xfrP->PFN[xfrP->pfnEnd], ".anew");
}
// Setup the command
//
cmdArg.theCmd = xfrCmd[iXfr];
cmdArg.theVec = Config.xfrCmd[iXfr].theVec;
cmdArg.theSrc = theSrc;
cmdArg.theDst = xfrP->PFN;
cmdArg.theINS = xfrP->reqData.iName;
if (!SetupCmd(&cmdArg)) return "incoming transfer setup failed";
// If the copycmd needs a placeholder in the filesystem for this transfer, we
// must create one. We first remove any existing "anew" file because we will
// over-write it. The create process will create a lock file if need be.
// However, we can ignore it as we are the only ones actually using it.
//
if (Config.xfrCmd[iXfr].Opts & Config.cmdAlloc)
{Config.ossFS->Unlink(lfnpath);
rc = Config.ossFS->Create(xfrP->reqData.User,lfnpath,fMode,myEnv,crOpts);
if (rc)
{Say.Emsg("Fetch", rc, "create placeholder for", lfnpath);
return "create failed";
}
doRM = 1;
} else doRM = Config.xfrCmd[iXfr].Opts & Config.cmdRME;
// Setup program monitoring data
//
pdSZ = (Config.xfrCmd[iXfr].Opts & Config.cmdXPD ? sizeof(pdBuff) : 0);
// Now run the command to get the file and make sure the file is there
// If it is, make sure that if a lock file exists its date/time is greater than
// the file we just fetched; then rename it to be the correct name.
//
xfrET = time(0);
if (!(rc = cmdArg.theCmd->Run(pdBuff, pdSZ)))
{if ((rc = Config.Stat(lfnpath, xfrP->PFN, &pfnStat)))
{Say.Emsg("Fetch", lfnpath, "fetched but not resident!"); fSize = 0;}
else {fSize = pfnStat.st_size;
if (Config.xfrCmd[iXfr].Opts & Config.cmdAlloc)
FetchDone(lfnpath, pfnStat, rc);
}
}
// Clean up if we failed otherwise tell the cmsd that we have a new file. Upon
// failure we issue a a remove as we don't want the temp file to exist.
//
xfrP->PFN[xfrP->pfnEnd] = '\0';
if (rc)
{if (doRM) Config.ossFS->Unlink(lfnpath);
ffMake(rc == -2);
if (rc == -2) {xfrP->RetCode = 2; retMsg = "file not found";}
else retMsg = "fetch failed";
} else if (Config.cmsPath) Config.cmsPath->Have(Lfn);
// We completed, see if we need to do statistics
//
if ((Config.xfrCmd[iXfr].Opts & Config.cmdStats) || XrdFrmMonitor::monSTAGE
|| (Trace.What & TRACE_Debug))
{time_t eNow = time(0);
int inqT, xfrT;
inqT = static_cast(xfrET - time_t(xfrP->reqData.addTOD));
if ((xfrT = static_cast(eNow - xfrET)) <= 0) xfrT = 1;
if (((Config.xfrCmd[iXfr].Opts & Config.cmdStats)
|| (Trace.What & TRACE_Debug)) && !retMsg)
{char sbuff[80];
sprintf(sbuff, "Got: %lld qt: %d xt: %d up: ", fSize, inqT, xfrT);
lfnpath[lfnEnd] = '\0';
Say.Say(0, sbuff, xfrP->reqData.User, " ", lfnpath);
}
if (XrdFrmMonitor::monSTAGE)
{if (rc < 0) rc = -rc;
snprintf(lfnpath+lfnEnd, sizeof(lfnpath)-lfnEnd-1,
"\n&tod=%lld&sz=%lld&qt=%d&tm=%d&op=%c&rc=%d%s%s",
static_cast(eNow), fSize, inqT, xfrT,
xfrP->Act, rc, (pdSZ ? "&pd=" : ""), (pdSZ ? pdBuff : ""));
XrdFrmMonitor::Map(XROOTD_MON_MAPSTAG,xfrP->reqData.User,lfnpath);
}
}
// All done
//
return retMsg;
}
/******************************************************************************/
/* F e t c h D o n e */
/******************************************************************************/
const char *XrdFrmTransfer::FetchDone(char *lfnpath, struct stat &Stat, int &rc)
{
// If we are running in new mode, update file attributes
//
rc = 0;
if (Config.runNew && Config.NeedsCTA(lfnpath))
{XrdOucXAttr cpyInfo;
cpyInfo.Attr.cpyTime = static_cast(Stat.st_mtime);
if ((rc = cpyInfo.Set(xfrP->PFN)))
Say.Emsg("Fetch", rc, "set copy time xattr on", xfrP->PFN);
}
// Check for a lock file and if we have one, reset it's time or delete it
//
if (Config.runOld && Config.NeedsCTA(lfnpath))
{struct stat lkfStat;
strcpy(&xfrP->PFN[xfrP->pfnEnd+5], ".lock");
if (!stat(xfrP->PFN, &lkfStat))
{if (Config.runNew && !rc) unlink(xfrP->PFN);
else {struct utimbuf tbuff;
tbuff.actime = tbuff.modtime = Stat.st_mtime;
if ((rc = utime(xfrP->PFN, &tbuff)))
Say.Emsg("Fetch", rc, "set utime on", xfrP->PFN);
}
}
}
// Now rename the lfn to be what it needs to be in the end
//
if (!rc && (rc=Config.ossFS->Rename(lfnpath,xfrP->reqData.LFN)))
Say.Emsg("Fetch", rc, "rename", lfnpath);
else XrdFrmCns::Add(xfrP->reqData.User, xfrP->reqData.LFN,
Stat.st_size, Stat.st_mode);
// Done
//
return (rc ? "Failed" : 0);
}
/******************************************************************************/
/* Private: f f C h e c k */
/******************************************************************************/
const char *XrdFrmTransfer::ffCheck()
{
const char *eTxt;
// Generate proper fail file path and check if it exists
//
if (Config.xfrFdir)
{char ffPath[MAXPATHLEN+8];
if (Config.xfrFdln+xfrP->pfnEnd+5 >= int(sizeof(ffPath))) return 0;
strcpy(ffPath, Config.xfrFdir);
strcpy(ffPath+Config.xfrFdln, xfrP->PFN);
strcpy(ffPath+Config.xfrFdln+xfrP->pfnEnd, ".fail");
eTxt = checkFF(ffPath);
} else {
strcpy(&xfrP->PFN[xfrP->pfnEnd], ".fail");
eTxt = checkFF(xfrP->PFN);
xfrP->PFN[xfrP->pfnEnd] = '\0';
}
// Determine result
//
if (eTxt) xfrP->RetCode = 1;
return eTxt;
}
/******************************************************************************/
/* Private: f f M a k e */
/******************************************************************************/
void XrdFrmTransfer::ffMake(int nofile){
static const mode_t fMode = S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH;
static const mode_t dMode = S_IXUSR|S_IWGRP|S_IXGRP|S_IXOTH | fMode;
char ffPath[MAXPATHLEN+8], *ffP;
int myFD;
// Generate fail file path
//
if (Config.xfrFdir)
{if (Config.xfrFdln+xfrP->pfnEnd+5 >= int(sizeof(ffPath))) return;
strcpy(ffPath, Config.xfrFdir);
strcpy(ffPath+Config.xfrFdln, xfrP->PFN);
strcpy(ffPath+Config.xfrFdln+xfrP->pfnEnd, ".fail");
XrdOucUtils::makePath(ffPath, dMode);
ffP = ffPath;
} else {
strcpy(&xfrP->PFN[xfrP->pfnEnd], ".fail");
ffP = xfrP->PFN;
}
// Create a fail file and if failure is due to "file not found" set the mtime
// to 2 so that the oss layer picks up the same error in the future.
//
myFD = open(ffP, O_CREAT, fMode);
if (myFD >= 0)
{close(myFD);
if (nofile)
{struct utimbuf tbuff;
tbuff.actime = time(0); tbuff.modtime = 2;
utime(ffP, &tbuff);
}
}
if (!Config.xfrFdir) xfrP->PFN[xfrP->pfnEnd] = '\0';
}
/******************************************************************************/
/* I n i t */
/******************************************************************************/
void *InitXfer(void *parg)
{ XrdFrmTransfer *xP = new XrdFrmTransfer;
xP->Start();
return (void *)0;
}
int XrdFrmTransfer::Init()
{
pthread_t tid;
int retc, n;
// Initialize the cluster identification object first
//
CID.Init(Config.QPath);
// Initialize the transfer queue first
//
if (!XrdFrmXfrQueue::Init()) return 0;
// Start the required number of transfer threads
//
n = Config.xfrMax;
while(n--)
{if ((retc = XrdSysThread::Run(&tid, InitXfer, (void *)0,
XRDSYSTHREAD_BIND, "transfer")))
{Say.Emsg("main", retc, "create xfr thread"); return 0;}
}
// All done
//
return 1;
}
/******************************************************************************/
/* Private: S e t u p C m d */
/******************************************************************************/
int XrdFrmTransfer::SetupCmd(XrdFrmTranArg *argP)
{
char *pdata[XrdOucMsubs::maxElem + 2], *cP;
int pdlen[XrdOucMsubs::maxElem + 2], i, k, n;
XrdOucMsubsInfo
Info(xfrP->reqData.User, argP->theEnv, Config.the_N2N,
xfrP->reqData.LFN+xfrP->reqData.LFO,
argP->theSrc, xfrP->reqData.Prty,
xfrP->reqData.Options & XrdFrcRequest::makeRW?O_RDWR:O_RDONLY,
argP->theMDP, xfrP->reqData.ID, xfrP->PFN, argP->theDst);
// We must establish the host, cluster and instance name if we have one
//
if (argP->theEnv)
{argP->theEnv->Put(SEC_HOST, Config.myName);
if (argP->theINS)
{CID.Get(argP->theINS, CMS_CID, argP->theEnv);
argP->theEnv->Put(XRD_INS, argP->theINS);
}
}
// Substitute in the parameters
//
k = argP->theVec->Subs(Info, pdata, pdlen);
// Catenate all of the arguments
//
*cmdBuff = '\0'; n = sizeof(cmdBuff) - 4; cP = cmdBuff;
for (i = 0; i < k; i++)
{n -= pdlen[i];
if (n < 0)
{Say.Emsg("Setup",E2BIG,"build command line for", xfrP->reqData.LFN);
return 0;
}
strcpy(cP, pdata[i]); cP += pdlen[i];
}
// Now setup the command
//
return (argP->theCmd->Setup(cmdBuff, &Say) == 0);
}
/******************************************************************************/
/* Public: S t a r t */
/******************************************************************************/
void XrdFrmTransfer::Start()
{
EPNAME("Transfer"); // Wrong but looks better
const char *Msg;
// Prime I/O queue selection
// Endless loop looking for transfer jobs
//
while(1)
{xfrP = XrdFrmXfrQueue::Get();
DEBUG(xfrP->Type <<" starting " <reqData.LFN
<<" for " <reqData.User);
Msg = (xfrP->qNum & XrdFrcRequest::outQ ? Throw() : Fetch());
if (Msg && !(xfrP->RetCode)) xfrP->RetCode = 1;
xfrP->PFN[xfrP->pfnEnd] = 0;
if (xfrP->RetCode || Config.Verbose)
{char buff1[280], buff2[80];
sprintf(buff1, "%s for %s", xfrP->RetCode ? "failed" : "complete",
xfrP->reqData.User);
if (xfrP->RetCode == 0) *buff2 = 0;
else sprintf(buff2, "; %s", (Msg ? Msg : "reason unknown"));
Say.Say(0, xfrP->Type, buff1, xfrP->reqData.LFN,buff2);
} else {
DEBUG(xfrP->Type
<<(xfrP->RetCode ? " failed " : " complete ")
<< xfrP->reqData.LFN <<" rc=" <RetCode
<<' ' <<(Msg ? Msg : ""));
}
XrdFrmXfrQueue::Done(xfrP, Msg);
}
}
/******************************************************************************/
/* Private: T r a c k D C */
/******************************************************************************/
int XrdFrmTransfer::TrackDC(char *Lfn, char *Mdp, char *Rfn)
{
char *FName, *Slash, *Slush = 0, *begRfn = Rfn;
int n = -1;
// If this is a url, then don't back space into the url part
//
if (*Rfn != '/'
&& (Slash = index(Rfn, '/')) && *(Slash+1) == '/'
&& (Slash = index(Slash+2, '/')) && *(Slash+1) == '/') begRfn = Slash+1;
// Discard the filename component
//
if (!(FName = rindex(begRfn, '/')) || FName == begRfn) return 0;
*FName = 0; Slash = Slush = FName;
// Try to find the created directory path
//
pMutex.Lock();
while(Slash != begRfn && !pTab.Find(Rfn))
{do {Slash--;} while(Slash != begRfn && *Slash != '/');
if (Slush) *Slush = '/';
*Slash = 0; Slush = Slash;
n++;
}
pMutex.UnLock();
// Compute offset of uncreated part
//
*Slash = '/';
if (Slash == begRfn) n = 0;
else n = (n >= 0 ? Slash - begRfn : FName - begRfn);
sprintf(Mdp, "%d", n);
// All done
//
return n;
}
/******************************************************************************/
int XrdFrmTransfer::TrackDC(char *Rfn)
{
char *Slash;
// Trim off the trailing end
//
if (!(Slash = rindex(Rfn, '/')) || Slash == Rfn) return 0;
*Slash = 0;
// The path has been added, do insert it into the table of created paths
//
pMutex.Lock();
pTab.Add(Rfn, 0, 0, Hash_data_is_key);
pMutex.UnLock();
*Slash = '/';
return 0;
}
/******************************************************************************/
/* T h r o w */
/******************************************************************************/
const char *XrdFrmTransfer::Throw()
{
XrdOucEnv myEnv(xfrP->reqData.Opaque?xfrP->reqData.LFN+xfrP->reqData.Opaque:0);
XrdFrmTranArg cmdArg(&myEnv);
struct stat begStat, endStat;
XrdFrmTranChk Chk(&begStat);
time_t xfrET;
const char *eTxt, *retMsg = 0;
char Rfn[MAXPATHLEN+256], *lfnpath = xfrP->reqData.LFN, *theDest;
char pdBuff[1024];
int isMigr = xfrP->reqData.Options & XrdFrcRequest::Migrate;
int iXfr, isURL, pdSZ, rc, mDP = -1;
// The remote source is either the url-lfn or a translated lfn
//
if ((isURL = xfrP->reqData.LFO)) theDest = xfrP->reqData.LFN;
else {if (!Config.RemotePath(xfrP->reqData.LFN, Rfn, sizeof(Rfn)))
return "lfn2rfn failed";
theDest = Rfn;
isURL = (*Rfn != '/');
}
// Check if we can actually handle this transfer
//
if (isURL)
{if (xfrCmd[3]) iXfr = 3;
else return "url copies not configured";
} else {
if (xfrCmd[1]) iXfr = 1;
else return "non-url copies not configured";
}
// Check if the file exists (we only copy resident files)
//
if (Config.Stat(lfnpath+xfrP->reqData.LFO, xfrP->PFN, &begStat))
return (xfrP->reqFQ ? "file not found" : 0);
// Check for a fail file
//
if ((eTxt = ffCheck())) return eTxt;
// If this is an mss migration request, then recheck if the file can and
// need to be migrated based on the lock file. This also obtains a directory
// lock and lock file lock, as needed. If the file need not be migrated but
// should be purge, we will get a null string error.
//
if (isMigr && (eTxt = ThrowOK(&Chk)))
{if (*eTxt) return eTxt;
if (!(xfrP->reqData.Options & XrdFrcRequest::Purge)) return "logic error";
Throwaway();
return 0;
}
// Setup the command, including directory tracking, as needed
//
cmdArg.theCmd = xfrCmd[iXfr];
cmdArg.theVec = Config.xfrCmd[iXfr].theVec;
cmdArg.theDst = theDest;
cmdArg.theSrc = xfrP->PFN;
cmdArg.theINS = xfrP->reqData.iName;
if (Config.xfrCmd[iXfr].Opts & Config.cmdMDP)
mDP = TrackDC(lfnpath+xfrP->reqData.LFO, cmdArg.theMDP, Rfn);
if (!SetupCmd(&cmdArg)) return "outgoing transfer setup failed";
// Setup program monitoring data
//
pdSZ = (Config.xfrCmd[iXfr].Opts & Config.cmdXPD ? sizeof(pdBuff) : 0);
// Now run the command to put the file. If the command fails and this is a
// migration request, cretae a fail file if one does not exist.
//
xfrET = time(0);
if ((rc = cmdArg.theCmd->Run(pdBuff, pdSZ)))
{if (isMigr) ffMake(rc == -2);
retMsg = "copy failed";
}
// Track directory creations if we need to track them
//
if (!rc && mDP >= 0) TrackDC(Rfn);
// Obtain state of the file after the copy and make sure the file was not
// modified during the copy. This is an error for queued requests but
// internally generated requests will simply be retried.
//
if (!rc)
{if ((rc = Config.Stat(lfnpath+xfrP->reqData.LFO, xfrP->PFN, &endStat)))
{Say.Emsg("Throw", lfnpath, "transfered but not found!");
retMsg = "unable to verify copy";
} else {
if (begStat.st_mtime != endStat.st_mtime
|| begStat.st_size != endStat.st_size)
{Say.Emsg("Throw", lfnpath, "modified during transfer!");
retMsg = "file modified during copy"; rc = 1;
}
}
}
// Purge the file if so wanted. Otherwise, if this is a migration request,
// make sure that if a lock file exists its date/time is equal to the file
// we just copied to prevent the file from being copied again (we have a lock).
//
if (!rc)
{if (xfrP->reqData.Options & XrdFrcRequest::Purge) Throwaway();
else if (isMigr) ThrowDone(&Chk, endStat.st_mtime);
}
// Do statistics if so wanted
//
if ((Config.xfrCmd[iXfr].Opts & Config.cmdStats) || XrdFrmMonitor::monMIGR
|| (Trace.What & TRACE_Debug))
{time_t eNow = time(0);
int inqT, xfrT;
long long Fsize = begStat.st_size;
inqT = static_cast(xfrET - time_t(xfrP->reqData.addTOD));
if ((xfrT = static_cast(eNow - xfrET)) <= 0) xfrT = 1;
if (((Config.xfrCmd[iXfr].Opts & Config.cmdStats)
|| (Trace.What & TRACE_Debug)) && !rc)
{char sbuff[80];
sprintf(sbuff, "Put: %lld qt: %d xt: %d up: ",Fsize,inqT,xfrT);
Say.Say(0, sbuff, xfrP->reqData.User, " ", xfrP->reqData.LFN);
}
if (XrdFrmMonitor::monMIGR)
{char monBuff[MAXPATHLEN+1024+512+8];
if (rc < 0) rc = -rc;
snprintf(monBuff, sizeof(monBuff),
"%s\n&tod=%lld&sz=%lld&qt=%d&tm=%d&op=%c&rc=%d%s%s",
xfrP->reqData.LFN, static_cast(eNow), Fsize,
inqT, xfrT, xfrP->Act, rc,
(pdSZ ? "&pd=" : ""), (pdSZ ? pdBuff : ""));
XrdFrmMonitor::Map(XROOTD_MON_MAPMIGR,xfrP->reqData.User,monBuff);
}
}
// All done
//
return retMsg;
}
/******************************************************************************/
/* Private: T h r o w a w a y */
/******************************************************************************/
void XrdFrmTransfer::Throwaway()
{
EPNAME("Throwaway");
// Purge the file. We do this via the pfn but also indicate we want all
// migration support suffixes removed it they exist. Notify the cmsd & cnsd.
//
if (Config.Test) {DEBUG("Would have removed '" <PFN <<"'");}
else {Config.ossFS->Unlink(xfrP->PFN, XRDOSS_isPFN|XRDOSS_isMIG);
DEBUG("removed '" <PFN <<"'");
if (Config.cmsPath) Config.cmsPath->Gone(xfrP->PFN);
XrdFrmCns::Rm(xfrP->PFN);
}
}
/******************************************************************************/
/* Private: T h r o w D o n e */
/******************************************************************************/
void XrdFrmTransfer::ThrowDone(XrdFrmTranChk *cP, time_t endTime)
{
// Update file attributes if we are running in new mode, otherwise do
//
if (Config.runNew)
{XrdOucXAttr cpyInfo;
cpyInfo.Attr.cpyTime = static_cast(endTime);
if (cpyInfo.Set(xfrP->PFN, cP->lkfd))
Say.Emsg("Throw", "Unable to set copy time xattr for", xfrP->PFN);
else if (cP->lkfx)
{strcpy(&xfrP->PFN[xfrP->pfnEnd], ".lock");
unlink(xfrP->PFN);
xfrP->PFN[xfrP->pfnEnd] = '\0';
}
} else {
struct stat Stat;
strcpy(&xfrP->PFN[xfrP->pfnEnd], ".lock");
if (!stat(xfrP->PFN, &Stat))
{struct utimbuf tbuff;
tbuff.actime = tbuff.modtime = endTime;
if (utime(xfrP->PFN, &tbuff))
Say.Emsg("Throw", errno, "set utime for", xfrP->PFN);
}
xfrP->PFN[xfrP->pfnEnd] = '\0';
}
}
/******************************************************************************/
/* Private: T h r o w O K */
/******************************************************************************/
const char *XrdFrmTransfer::ThrowOK(XrdFrmTranChk *cP)
{
class fdClose
{public:
int Num;
fdClose() : Num(-1) {}
~fdClose() {if (Num >= 0) close(Num);}
} fnFD;
XrdOucXAttr cpyInfo;
struct stat lokStat;
int statRC;
// Check if the file is in use by checking if we got an exclusive lock
//
if ((fnFD.Num = XrdSysFD_Open(xfrP->PFN, O_RDWR)) < 0)
return "unable to open file";
if (XrdOucSxeq::Serialize(fnFD.Num,XrdOucSxeq::noWait)) return "file in use";
// Get the info on the lock file (enabled if old mode is in effect
//
if (Config.runOld)
{strcpy(&xfrP->PFN[xfrP->pfnEnd], ".lock");
statRC = stat(xfrP->PFN, &lokStat);
xfrP->PFN[xfrP->pfnEnd] = '\0';
} else statRC = 1;
if (statRC && !Config.runNew) return "missing lock file";
// If running in new mode then we must get the extened attribute for this file
// unless we got the lock file time which takes precendence.
//
if (Config.runNew)
{if (!statRC)
cpyInfo.Attr.cpyTime = static_cast(lokStat.st_mtime);
else if (cpyInfo.Get(xfrP->PFN, fnFD.Num) <= 0)
return "unable to get copy time xattr";
}
// Verify the information
//
if (cpyInfo.Attr.cpyTime >= static_cast(cP->Stat->st_mtime))
{if (xfrP->reqData.Options & XrdFrcRequest::Purge) return "";
return "already migrated";
}
// Keep the lock on the base file until we are through. No one is allowed to
// modify this file until we have migrate it.
//
cP->lkfd = fnFD.Num;
cP->lkfx = statRC == 0;
fnFD.Num = -1;
return 0;
}