/******************************************************************************/
/* */
/* X r d O f s T P C P r o g . c c */
/* */
/* (c) 2011 by the Board of Trustees of the Leland Stanford, Jr., University */
/* All Rights Reserved */
/* Produced by Andrew Hanushevsky for Stanford University under contract */
/* DE-AC02-76-SFO0515 with the Department of Energy */
/* */
/* This file is part of the XRootD software suite. */
/* */
/* XRootD is free software: you can redistribute it and/or modify it under */
/* the terms of the GNU Lesser General Public License as published by the */
/* Free Software Foundation, either version 3 of the License, or (at your */
/* option) any later version. */
/* */
/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
/* License for more details. */
/* */
/* You should have received a copy of the GNU Lesser General Public License */
/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
/* COPYING (GPL license). If not, see . */
/* */
/* The copyright holder's institutional names and contributor's names may not */
/* be used to endorse or promote products derived from this software without */
/* specific prior written permission of the institution or contributor. */
/******************************************************************************/
#include
#include
#include "XrdOfs/XrdOfsTPC.hh"
#include "XrdOfs/XrdOfsTPCJob.hh"
#include "XrdOfs/XrdOfsTPCProg.hh"
#include "XrdOfs/XrdOfsTrace.hh"
#include "XrdOss/XrdOss.hh"
#include "XrdOuc/XrdOucCallBack.hh"
#include "XrdOuc/XrdOucProg.hh"
#include "XrdOuc/XrdOucTrace.hh"
#include "XrdSys/XrdSysError.hh"
#include "XrdSys/XrdSysFD.hh"
#include "XrdSys/XrdSysHeaders.hh"
/******************************************************************************/
/* G l o b a l O b j e c t s */
/******************************************************************************/
extern XrdSysError OfsEroute;
extern XrdOucTrace OfsTrace;
extern XrdOss *XrdOfsOss;
namespace XrdOfsTPCParms
{
extern char *XfrProg;
extern char *cksType;
extern int xfrMax;
extern int errMon;
extern bool doEcho;
extern bool autoRM;
};
using namespace XrdOfsTPCParms;
/******************************************************************************/
/* S t a t i c V a r i a b l e s */
/******************************************************************************/
XrdSysMutex XrdOfsTPCProg::pgmMutex;
XrdOfsTPCProg *XrdOfsTPCProg::pgmIdle = 0;
/******************************************************************************/
/* E x t e r n a l L i n k a g e s */
/******************************************************************************/
void *XrdOfsTPCProgRun(void *pp)
{
XrdOfsTPCProg *theProg = (XrdOfsTPCProg *)pp;
theProg->Run();
return (void *)0;
}
/******************************************************************************/
/* L o c a l C l a s s e s */
/******************************************************************************/
namespace
{
class credFile
{
public:
char *Path;
char pEnv[MAXPATHLEN+65];
credFile(XrdOfsTPCJob *jP)
{if (jP->Info.Csz > 0 && jP->Info.Crd && jP->Info.Env)
{int n;
csMutex.Lock(); n = cSeq++; csMutex.UnLock();
snprintf(pEnv, sizeof(pEnv), "%s=%s%s#%d.creds",
jP->Info.Env, jP->credPath(), jP->Info.Org, n);
Path = index(pEnv,'=')+1;
} else Path = 0;
}
~credFile() {if (Path) unlink(Path);}
private:
static XrdSysMutex csMutex;
static int cSeq;
};
XrdSysMutex credFile::csMutex;
int credFile::cSeq = 0;
}
/******************************************************************************/
/* C o n s t r u c t o r */
/******************************************************************************/
XrdOfsTPCProg::XrdOfsTPCProg(XrdOfsTPCProg *Prev, int num, int errMon)
: Prog(&OfsEroute, errMon),
JobStream(&OfsEroute),
Next(Prev), Job(0)
{snprintf(Pname, sizeof(Pname), "TPC job %d: ", num);
Pname[sizeof(Pname)-1] = 0;
}
/******************************************************************************/
/* E x p o r t C r e d s */
/******************************************************************************/
int XrdOfsTPCProg::ExportCreds(const char *path)
{
static const int oOpts = (O_CREAT | O_TRUNC | O_WRONLY);
static const mode_t oMode = (S_IRUSR | S_IWUSR);
int fd, rc;
// Open the file as if it were new
//
fd = XrdSysFD_Open(path, oOpts, oMode);
if (fd < 0)
{rc = errno;
OfsEroute.Emsg("TPC", rc, "create credentials file", path);
return -rc;
}
// Write out the credentials
//
if (write(fd, Job->Info.Crd, Job->Info.Csz) < 0)
{rc = errno;
OfsEroute.Emsg("TPC", rc, "write credentials file", path);
} else rc = 0;
// Close the file and return (we ignore close errors)
//
close(fd);
return rc;
}
/******************************************************************************/
/* I n i t */
/******************************************************************************/
int XrdOfsTPCProg::Init()
{
int n;
// Allocate copy program objects
//
for (n = 0; n < xfrMax; n++)
{pgmIdle = new XrdOfsTPCProg(pgmIdle, n, errMon);
if (pgmIdle->Prog.Setup(XfrProg, &OfsEroute)) return 0;
}
// All done
//
doEcho = doEcho || GTRACE(debug);
return 1;
}
/******************************************************************************/
/* R u n */
/******************************************************************************/
void XrdOfsTPCProg::Run()
{
int rc;
// Run the current job and indicate it's ending status and possibly getting a
// another job to run. Note "Job" will always be valid.
//
do{rc = Xeq();
Job = Job->Done(this, eRec, rc);
} while(Job);
// No more jobs to run. Place us on the idle queue. Upon return this thread
// will end.
//
pgmMutex.Lock();
Next = pgmIdle;
pgmIdle = this;
pgmMutex.UnLock();
}
/******************************************************************************/
/* S t a r t */
/******************************************************************************/
XrdOfsTPCProg *XrdOfsTPCProg::Start(XrdOfsTPCJob *jP, int &rc)
{
XrdSysMutexHelper pgmMon(&pgmMutex);
XrdOfsTPCProg *pgmP;
pthread_t tid;
// Get a new program object, if none left, tell the caller to try later
//
if (!(pgmP = pgmIdle)) {rc = 0; return 0;}
pgmP->Job = jP;
// Start a thread to run the job
//
if ((rc = XrdSysThread::Run(&tid, XrdOfsTPCProgRun, (void *)pgmP, 0,
"TPC job")))
return 0;
// We are all set, return the program being used
//
pgmIdle = pgmP->Next;
return pgmP;
}
/******************************************************************************/
/* X e q */
/******************************************************************************/
int XrdOfsTPCProg::Xeq()
{
EPNAME("Xeq");
credFile cFile(Job);
const char *Args[6], *eVec[5], **envArg;
char *lP, *Colon, *cksVal, sBuff[8], *tident = Job->Info.Org;
char *Quest = index(Job->Info.Key, '?');
int i, rc, aNum = 0;
// If we have credentials, write them out to a file
//
if (cFile.Path && (rc = ExportCreds(cFile.Path)))
{strcpy(eRec, "Copy failed; unable to pass credentials.");
return rc;
}
// Echo out what we are doing if so desired
//
if (doEcho)
{if (Quest) *Quest = 0;
OfsEroute.Say(Pname,tident," copying ",Job->Info.Key," to ",Job->Info.Dst);
if (Quest) *Quest = '?';
}
// Determine checksum option
//
cksVal = (Job->Info.Cks ? Job->Info.Cks : XrdOfsTPCParms::cksType);
if (cksVal)
{Args[aNum++] = "-C";
Args[aNum++] = cksVal;
}
// Set streams option if need be
//
if (Job->Info.Str)
{sprintf(sBuff, "%d", static_cast(Job->Info.Str));
Args[aNum++] = "-S";
Args[aNum++] = sBuff;
}
// Set remaining arguments
//
Args[aNum++] = Job->Info.Key;
Args[aNum++] = Job->Info.Dst;
// Always export the trace identifier of the original issuer
//
char tidBuff[512];
snprintf(tidBuff, sizeof(tidBuff), "XRD_TIDENT=%s", tident);
eVec[0] = tidBuff;
envArg = eVec;
i = 1;
// Export source protocol if present
//
char sprBuff[128];
if (Job->Info.Spr)
{snprintf(sprBuff, sizeof(sprBuff), "XRDTPC_SPROT=%s", Job->Info.Spr);
eVec[i++] = sprBuff;
}
// Export target protocol if present
//
char tprBuff[128];
if (Job->Info.Tpr)
{snprintf(tprBuff, sizeof(tprBuff), "XRDTPC_TPROT=%s", Job->Info.Tpr);
eVec[i++] = sprBuff;
}
// Determine if credentials are being passed, If so, we don't need any cgi but
// we must set an envar to point to the file holding the credentials.
//
if (cFile.Path)
{eVec[i++] = cFile.pEnv;
if (Quest) *Quest = 0;
}
eVec[i] = 0;
// Start the job.
//
if ((rc = Prog.Run(&JobStream, Args, aNum, envArg)))
{strcpy(eRec, "Copy failed; unable to start job.");
OfsEroute.Emsg("TPC", Job->Info.Org, Job->Info.Lfn, eRec);
return rc;
}
// Now we drain the output looking for an end of run line. This line should
// be printed as an error message should the copy fail.
//
*eRec = 0;
while((lP = JobStream.GetLine()))
{if ((Colon = index(lP, ':')) && *(Colon+1) == ' ')
{strncpy(eRec, Colon+2, sizeof(eRec)-1);
eRec[sizeof(eRec)-1] = 0;
}
if (doEcho && *lP) OfsEroute.Say(Pname, lP);
}
// The job has completed. So, we must get the ending status.
//
if ((rc = Prog.RunDone(JobStream)) < 0) rc = -rc;
DEBUG(Pname <<"ended with rc=" <Info.Org, Job->Info.Lfn, eRec);
if (autoRM) XrdOfsOss->Unlink(Job->Info.Lfn);
} else Job->Info.Success();
// All done
//
return rc;
}