/******************************************************************************/
/* */
/* X r d F r m C n s . c c */
/* */
/* (c) 2011 by the Board of Trustees of the Leland Stanford, Jr., University */
/* All Rights Reserved */
/* Produced by Andrew Hanushevsky for Stanford University under contract */
/* DE-AC02-76-SFO0515 with the Department of Energy */
/* */
/* This file is part of the XRootD software suite. */
/* */
/* XRootD is free software: you can redistribute it and/or modify it under */
/* the terms of the GNU Lesser General Public License as published by the */
/* Free Software Foundation, either version 3 of the License, or (at your */
/* option) any later version. */
/* */
/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
/* License for more details. */
/* */
/* You should have received a copy of the GNU Lesser General Public License */
/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
/* COPYING (GPL license). If not, see . */
/* */
/* The copyright holder's institutional names and contributor's names may not */
/* be used to endorse or promote products derived from this software without */
/* specific prior written permission of the institution or contributor. */
/******************************************************************************/
#include
#include
#include
#include
#include
#include
#include
#include
#include "XrdFrc/XrdFrcTrace.hh"
#include "XrdFrm/XrdFrmCns.hh"
#include "XrdFrm/XrdFrmConfig.hh"
#include "XrdOuc/XrdOucSxeq.hh"
#include "XrdOuc/XrdOucUtils.hh"
#include "XrdSys/XrdSysPthread.hh"
#include "XrdSys/XrdSysTimer.hh"
using namespace XrdFrc;
using namespace XrdFrm;
/******************************************************************************/
/* S t a t i c V a r i a b l e s */
/******************************************************************************/
char *XrdFrmCns::cnsPath = 0;
char *XrdFrmCns::cnsHdr[2] = {0, 0};
int XrdFrmCns::cnsHdrLen = 0;
int XrdFrmCns::cnsFD = -1;
int XrdFrmCns::cnsMode = XrdFrmCns::cnsIgnore;
int XrdFrmCns::cnsInit = 1;
/******************************************************************************/
/* A d d */
/******************************************************************************/
void XrdFrmCns::Add(const char *tID, const char *Path,
long long Size, mode_t Mode)
{
static const int mMask = S_IRWXU|S_IRWXG|S_IRWXO;
static char NewLine = '\n';
struct iovec iov[9];
char mBuff[8], sBuff[24];
// Check if there is a cns here and we should initialize it
//
if (!cnsMode) return;
if (cnsInit && !Init())
{Say.Emsg("FrmCns", "Auto-ignore cnsd create", Path); return;}
// Fill out the io vector
//
iov[0].iov_base = (char *)tID;
iov[0].iov_len = strlen(tID);
iov[1].iov_base = (char *)" create ";
iov[1].iov_len = 8;
iov[2].iov_base = mBuff;
iov[2].iov_len = sprintf(mBuff, "%3o ", Mode&mMask);
iov[3].iov_base = (char *)Path;
iov[3].iov_len = strlen(Path);
iov[4].iov_base = 

iov[4].iov_len = 1;
iov[5] = iov[0];
iov[6].iov_base = (char *)" closew ";
iov[6].iov_len = 8;
iov[7] = iov[3];
iov[8].iov_base = sBuff;
iov[8].iov_len = sprintf(sBuff, " %lld\n", Size);
// Send this off to the cnsd
//
if (!Send2Cnsd(iov, 9)) Say.Emsg("FrmCns", "Auto-ignore cnsd create", Path);
}
/******************************************************************************/
/* D e l */
/******************************************************************************/
void XrdFrmCns::Del(const char *Path, int HdrType, int islfn)
{
static char NewLine = '\n';
struct iovec iov[] = {{cnsHdr[HdrType],(size_t)cnsHdrLen},{0,0},{&NewLine,1}};
char buff[MAXPATHLEN];
// Check if we should initialize
//
if (cnsInit && !Init())
{Say.Emsg("FrmCns", "Auto-ignore cnsd remove", Path); return;}
// In most cases, del gets a pfn. We need to translate to an lfn.
//
if (islfn)
{iov[1].iov_base = (char *)Path;
iov[1].iov_len = strlen(Path);
} else if (!Config.LogicalPath(Path, buff, sizeof(buff))) return;
else {iov[1].iov_base = buff;
iov[1].iov_len = strlen(buff);
}
// Send this off to the cnsd
//
if (!Send2Cnsd(iov, 3)) Say.Emsg("FrmCns", "Auto-ignore cnsd remove", Path);
}
/******************************************************************************/
/* I n i t */
/******************************************************************************/
int XrdFrmCns::Init()
{
static const int oMode= O_WRONLY | O_NONBLOCK | O_NDELAY;
static XrdSysMutex initMutex;
int rc, pMsg = 0;
// Check if we really need to re-initialize
//
initMutex.Lock();
if (!cnsInit) {initMutex.UnLock(); return 1;}
// Open the events FIFO. It might not exists or we might not be able to write
//
while((cnsFD = open(cnsPath, oMode)) < 0 && Retry(errno, pMsg)) {}
// Check how we ended
//
cnsInit = (cnsFD < 0 ? 1 : 0);
rc = !cnsInit;
// All done
//
initMutex.UnLock();
return rc;
}
/******************************************************************************/
int XrdFrmCns::Init(const char *aPath, int Opts)
{
int rc;
if (aPath && (rc = setPath(aPath, 0))) return rc;
cnsMode = Opts;
return 0;
}
/******************************************************************************/
int XrdFrmCns::Init(const char *myID, const char *aPath, const char *iName)
{
char buff[2048];
int rc;
// If we are ignoring the cns then don't bother with this
//
if (!cnsMode) return 0;
// Construct the path to he cns events file (we know buff is large enough)
//
if (!cnsPath && (rc = setPath(aPath, iName))) return rc;
// Create a static headers for deletes
//
cnsHdrLen = sprintf(buff, "%s.%d.0@localhost rmdir ", myID, getpid());
cnsHdr[HdrRmd] = strdup(buff);
sprintf(buff, "%s.%d.0@localhost rm ", myID, getpid());
cnsHdr[HdrRmf] = strdup(buff);
// All done
//
return 0;
}
/******************************************************************************/
/* R e t r y */
/******************************************************************************/
int XrdFrmCns::Retry(int eNum, int &pMsg)
{
static const char *eAct = (cnsMode > 0 ? "Waiting for" : "Ignoring");
static const int Yawn = 10, Blurt = 6;
// Always retry interrupted calls (these rarely happen, if ever)
//
if (eNum == EINTR) return 1;
// Issue message as needed
//
if (eNum == ENOENT || eNum == EAGAIN || eNum == ENXIO || eNum == EPIPE)
{if (!(pMsg++%Blurt)) Say.Emsg("FrmCns", eAct, "cnsd on path", cnsPath);}
else Say.Emsg("FrmCns", errno, "notify cnsd via", cnsPath);
// Check if we should sleep and retry or simply ignore this
//
if (cnsMode <= 0) return 0;
XrdSysTimer::Snooze(Yawn);
return 1;
}
/******************************************************************************/
/* S e n d 2 C n s d */
/******************************************************************************/
int XrdFrmCns::Send2Cnsd(struct iovec *iov, int iovn)
{
int rc, pMsg = 0;
// Normally, writes will be atomic if we don't exceed PIPE_BUF, but this is
// not gauranteed when using vector writes. Plus, on some platforms, PIPE_BUF
// is way too small (i.e. 512 bytes). So, we just lock the file.
//
XrdOucSxeq::Serialize(cnsFD, 0);
// Now write the data
//
while((rc = writev(cnsFD, iov, iovn)) < 0 && Retry(errno, pMsg)) {}
// Unlock the file
//
XrdOucSxeq::Release(cnsFD);
// All done
//
return rc > 0;
}
/******************************************************************************/
/* s e t P a t h */
/******************************************************************************/
int XrdFrmCns::setPath(const char *aPath, const char *iName)
{
static const char *sfx = "XrdCnsd.events";
struct stat Stat;
char buff[2048], *pP;
// Release any cnspath we have
//
if (cnsPath) {free(cnsPath); cnsPath = 0;}
// Generate a new one and make sure it is usable
//
pP = XrdOucUtils::genPath(aPath, iName, "cns");
if (strlen(pP) + strlen(sfx) >= (int)sizeof(buff))
{Say.Emsg("FrmCns", "Invalid cnsd apath", aPath); free(pP); return 1;}
strcpy(buff, pP); free(pP);
strcat(buff, "XrdCnsd.events");
if (stat(buff, &Stat) && errno != ENOENT)
{Say.Emsg("FrmCns", errno, "use cnsd file", buff); return 1;}
cnsPath = strdup(buff);
return 0;
}