/******************************************************************************/ /* */ /* X r d O f s E v s . c c */ /* */ /* (c) 2005 by the Board of Trustees of the Leland Stanford, Jr., University */ /* All Rights Reserved */ /* Produced by Andrew Hanushevsky for Stanford University under contract */ /* DE-AC02-76-SFO0515 with the Department of Energy */ /* */ /* This file is part of the XRootD software suite. */ /* */ /* XRootD is free software: you can redistribute it and/or modify it under */ /* the terms of the GNU Lesser General Public License as published by the */ /* Free Software Foundation, either version 3 of the License, or (at your */ /* option) any later version. */ /* */ /* XRootD is distributed in the hope that it will be useful, but WITHOUT */ /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */ /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */ /* License for more details. */ /* */ /* You should have received a copy of the GNU Lesser General Public License */ /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */ /* COPYING (GPL license). If not, see . */ /* */ /* The copyright holder's institutional names and contributor's names may not */ /* be used to endorse or promote products derived from this software without */ /* specific prior written permission of the institution or contributor. */ /* Based on code developed by Derek Feichtinger, CERN. */ /******************************************************************************/ #include #include #include #include #include #include #include #include "XrdOfs/XrdOfsEvs.hh" #include "XrdSys/XrdSysError.hh" #include "XrdOuc/XrdOucProg.hh" #include "XrdOuc/XrdOucStream.hh" #include "XrdNet/XrdNetOpts.hh" #include "XrdNet/XrdNetSocket.hh" #include "XrdSys/XrdSysPlatform.hh" /******************************************************************************/ /* L o c a l C l a s s e s */ /******************************************************************************/ class XrdOfsEvsMsg { public: XrdOfsEvsMsg *next; char *text; int tlen; int isBig; XrdOfsEvsMsg(char *tval=0, int big=0) {text = tval; tlen=0; isBig = big; next=0;} ~XrdOfsEvsMsg() {if (text) free(text);} }; /******************************************************************************/ /* E x t e r n a l L i n k a g e s */ /******************************************************************************/ void *XrdOfsEvsSend(void *pp) { XrdOfsEvs *evs = (XrdOfsEvs *)pp; evs->sendEvents(); return (void *)0; } /******************************************************************************/ /* S t a t i c D e f i n i t i o n s */ /******************************************************************************/ XrdOfsEvsFormat XrdOfsEvs::MsgFmt[XrdOfsEvs::nCount]; const int XrdOfsEvs::minMsgSize; const int XrdOfsEvs::maxMsgSize; /******************************************************************************/ /* X r d E v s F o r m a t : : D e f */ /******************************************************************************/ void XrdOfsEvsFormat::Def(evFlags theFlags, const char *Fmt, ...) { va_list ap; int theVal, i = 0; // Return if already defined // if (Format) return; // Set flags and format. Prepare the arg vector // Flags = theFlags; Format = Fmt; memset(Args, 0, sizeof(Args)); // Pick up all arguments // va_start(ap, Fmt); while((theVal = va_arg(ap, int)) >= 0) Args[i++] = static_cast(theVal); va_end(ap); } /******************************************************************************/ /* C o n s t r u c t o r */ /******************************************************************************/ XrdOfsEvs::XrdOfsEvs(Event theEvents, const char *Target, int minq, int maxq) { // Set common variables // enEvents = static_cast(theEvents & enMask); endIT = 0; theTarget = strdup(Target); eDest = 0; theProg = 0; maxMin = minq; maxMax = maxq; msgFirst = msgLast = msgFreeMax = msgFreeMin = 0; numMax = numMin = 0; tid = 0; msgFD = -1; // Initialize all static format entries that have not been initialized yet. // Note that format may be specified prior to this object being created! // // chmod // MsgFmt[Chmod & Mask].Def(XrdOfsEvsFormat::cvtMode, "%s chmod %s %s\n", XrdOfsEvsInfo::evTID, XrdOfsEvsInfo::evFMODE, XrdOfsEvsInfo::evLFN1, -1); // closer // MsgFmt[Closer & Mask].Def(XrdOfsEvsFormat::Null, "%s closer %s\n", XrdOfsEvsInfo::evTID, XrdOfsEvsInfo::evLFN1, -1); // closew // MsgFmt[Closew & Mask].Def(XrdOfsEvsFormat::Null, "%s closew %s\n", XrdOfsEvsInfo::evTID, XrdOfsEvsInfo::evLFN1, -1); // create // MsgFmt[Create & Mask].Def(XrdOfsEvsFormat::cvtMode, "%s create %s %s\n", XrdOfsEvsInfo::evTID, XrdOfsEvsInfo::evFMODE, XrdOfsEvsInfo::evLFN1, -1); // mkdir // MsgFmt[Mkdir & Mask].Def(XrdOfsEvsFormat::cvtMode, "%s mkdir %s %s\n", XrdOfsEvsInfo::evTID, XrdOfsEvsInfo::evFMODE, XrdOfsEvsInfo::evLFN1, -1); // mv // MsgFmt[Mv & Mask].Def(XrdOfsEvsFormat::Null, "%s mv %s %s\n", XrdOfsEvsInfo::evTID, XrdOfsEvsInfo::evLFN1, XrdOfsEvsInfo::evLFN2, -1); // openr // MsgFmt[Openr & Mask].Def(XrdOfsEvsFormat::Null, "%s openr %s\n", XrdOfsEvsInfo::evTID, XrdOfsEvsInfo::evLFN1, -1); // openw // MsgFmt[Openw & Mask].Def(XrdOfsEvsFormat::Null, "%s openw %s\n", XrdOfsEvsInfo::evTID, XrdOfsEvsInfo::evLFN1, -1); // rm // MsgFmt[Rm & Mask].Def(XrdOfsEvsFormat::Null, "%s rm %s\n", XrdOfsEvsInfo::evTID, XrdOfsEvsInfo::evLFN1, -1); // rmdir // MsgFmt[Rmdir & Mask].Def(XrdOfsEvsFormat::Null, "%s rmdir %s\n", XrdOfsEvsInfo::evTID, XrdOfsEvsInfo::evLFN1, -1); // trunc // MsgFmt[Trunc & Mask].Def(XrdOfsEvsFormat::cvtFSize,"%s trunc %s\n", XrdOfsEvsInfo::evTID, XrdOfsEvsInfo::evFSIZE,-1); // fwrite // MsgFmt[Fwrite & Mask].Def(XrdOfsEvsFormat::Null, "%s fwrite %s\n", XrdOfsEvsInfo::evTID, XrdOfsEvsInfo::evLFN1, -1); } /******************************************************************************/ /* D e s t r u c t o r */ /******************************************************************************/ XrdOfsEvs::~XrdOfsEvs() { XrdOfsEvsMsg *tp; // Kill the notification thread. This may cause a msg block to be orphaned // but, in practice, this object does not really get deleted after being // started. So, the problem is moot. // endIT = 1; if (tid) XrdSysThread::Kill(tid); // Release all queued message bocks // qMut.Lock(); while ((tp = msgFirst)) {msgFirst = tp->next; delete tp;} if (theTarget) free(theTarget); if (msgFD >= 0)close(msgFD); if (theProg) delete theProg; qMut.UnLock(); // Release all free message blocks // fMut.Lock(); while ((tp = msgFreeMax)) {msgFreeMax = tp->next; delete tp;} while ((tp = msgFreeMin)) {msgFreeMin = tp->next; delete tp;} fMut.UnLock(); } /******************************************************************************/ /* N o t i f y */ /******************************************************************************/ void XrdOfsEvs::Notify(Event eID, XrdOfsEvsInfo &Info) { static int warnings = 0; XrdOfsEvsFormat *fP; XrdOfsEvsMsg *tp; char modebuff[8], sizebuff[16]; int eNum, isBig = (eID & Mv), msgSize = (isBig ? maxMsgSize : minMsgSize); // Validate event number and set event name // eNum = eID & Mask; if (eNum < 0 || eNum >= nCount) return; // Check if we need to do any conversions // fP = &MsgFmt[eNum]; if (fP->Flags & XrdOfsEvsFormat::cvtMode) {sprintf(modebuff, "%o", static_cast((Info.FMode() & S_IAMB))); Info.Set(XrdOfsEvsInfo::evFMODE, modebuff); } else Info.Set(XrdOfsEvsInfo::evFMODE, "$FMODE"); if (fP->Flags & XrdOfsEvsFormat::cvtFSize) {sprintf(sizebuff, "%lld", Info.FSize()); Info.Set(XrdOfsEvsInfo::evFSIZE, sizebuff); } else Info.Set(XrdOfsEvsInfo::evFSIZE, "$FSIZE"); // Get a message block // if (!(tp = getMsg(isBig))) {if ((++warnings & 0xff) == 1) {eDest->Emsg("Notify", "Ran out of message objects;", eName(eNum), "event notification not sent."); } return; } // Format the message // tp->tlen = fP->SNP(Info, tp->text, msgSize); // Put the message on the queue and return // tp->next = 0; qMut.Lock(); if (msgLast) {msgLast->next = tp; msgLast = tp;} else msgFirst = msgLast = tp; qMut.UnLock(); qSem.Post(); } /******************************************************************************/ /* P a r s e */ /******************************************************************************/ int XrdOfsEvs::Parse(XrdSysError &Eroute, XrdOfsEvs::Event eNum, char *mText) { static struct valVar {const char *vname; XrdOfsEvsInfo::evArg vnum; XrdOfsEvsFormat::evFlags vopt;} Vars[] = { {"TID", XrdOfsEvsInfo::evTID, XrdOfsEvsFormat::Null}, {"LFN", XrdOfsEvsInfo::evLFN1, XrdOfsEvsFormat::Null}, {"LFN1", XrdOfsEvsInfo::evLFN1, XrdOfsEvsFormat::Null}, {"CGI", XrdOfsEvsInfo::evCGI1, XrdOfsEvsFormat::Null}, {"CGI1", XrdOfsEvsInfo::evCGI1, XrdOfsEvsFormat::Null}, {"LFN2", XrdOfsEvsInfo::evLFN2, XrdOfsEvsFormat::Null}, {"CGI2", XrdOfsEvsInfo::evCGI2, XrdOfsEvsFormat::Null}, {"FMODE", XrdOfsEvsInfo::evFMODE, XrdOfsEvsFormat::cvtMode}, {"FSIZE", XrdOfsEvsInfo::evFSIZE, XrdOfsEvsFormat::cvtFSize} }; int numvars = sizeof(Vars)/sizeof(struct valVar); char parms[1024], *pP = parms; char *pE = parms+sizeof(parms)-((XrdOfsEvsInfo::evARGS*2)-8); char varbuff[16], *bVar, *eVar; int i, j, aNum = 0, Args[XrdOfsEvsInfo::evARGS] = {0}; XrdOfsEvsFormat::evFlags ArgOpts = XrdOfsEvsFormat::freeFmt; // Parse the text // parms[0] = '\0'; while(*mText && pP < pE) {if (*mText == '\\' && *(mText+1) == '$') {*pP++ = '$'; mText += 2; continue;} else if (*mText != '$') {*pP++ = *mText++; continue;} bVar = mText+1; if (*mText == '{') {eVar = index(mText, '}'); j = 1;} else if (*mText == '[') {eVar = index(mText, ']'); j = 1;} else {eVar = bVar; while(isalpha(*eVar)) eVar++; j = 0;} i = eVar - bVar; if (i < 1 || i >= (int)sizeof(varbuff)) {Eroute.Emsg("Parse","Invalid notifymsg variable starting at",mText); return 1; } strncpy(varbuff, bVar, i); varbuff[i] = '\0'; for (i = 0; i < numvars; i++) if (!strcmp(varbuff, Vars[i].vname)) break; if (i >= numvars) {Eroute.Emsg("Parse", "Unknown notifymsg variable -",varbuff); return 1; } if (aNum >= XrdOfsEvsInfo::evARGS) {Eroute.Say("Parse", "Too many notifymsg variables"); return 1;} strcpy(pP, "%s"); pP += 2; Args[aNum++] = Vars[i].vnum; ArgOpts = static_cast(ArgOpts|Vars[i].vopt); mText = eVar+j; } // Check if we overran the buffer or didn't have any text // if (pP >= pE) {Eroute.Emsg("Parse","notifymsg text too long");return 1;} if (!parms[0]) {Eroute.Emsg("Parse","notifymsg text not specified");return 1;} // Set the format // strcpy(pP, "\n"); eNum = static_cast(eNum & Mask); MsgFmt[eNum].Set(ArgOpts, strdup(parms), Args); // All done // return 0; } /******************************************************************************/ /* s e n d E v e n t s */ /******************************************************************************/ void XrdOfsEvs::sendEvents(void) { XrdOfsEvsMsg *tp; const char *theData[2] = {0,0}; int theDlen[2] = {0,0}; // This is an endless loop that just gets things off the event queue and // send them out. This allows us to only hang a simgle thread should the // receiver get blocked, instead of the whole process. // while(1) {qSem.Wait(); qMut.Lock(); if (endIT) break; if ((tp = msgFirst) && !(msgFirst = tp->next)) msgLast = 0; qMut.UnLock(); if (tp) {if (!theProg) Feed(tp->text, tp->tlen); else {theData[0] = tp->text; theDlen[0] = tp->tlen; theProg->Feed(theData, theDlen); } retMsg(tp); } } qMut.UnLock(); } /******************************************************************************/ /* S t a r t */ /******************************************************************************/ int XrdOfsEvs::Start(XrdSysError *eobj) { int rc; // Set the error object pointer // eDest = eobj; // Check if we need to create a socket to a path // if (*theTarget == '>') {XrdNetSocket *msgSock; if (!(msgSock = XrdNetSocket::Create(eobj,theTarget+1,0,0660,XRDNET_FIFO))) return -1; msgFD = msgSock->Detach(); delete msgSock; } else { // Allocate a new program object if we don't have one // if (theProg) return 0; theProg = new XrdOucProg(eobj); // Setup the program // if (theProg->Setup(theTarget, eobj)) return -1; if ((rc = theProg->Start())) {eobj->Emsg("Evs", rc, "start event collector"); return -1;} } // Now start a thread to get messages and send them to the collector // if ((rc = XrdSysThread::Run(&tid, XrdOfsEvsSend, static_cast(this), 0, "Event notification sender"))) {eobj->Emsg("Evs", rc, "create event notification thread"); return -1; } // All done // return 0; } /******************************************************************************/ /* P r i v a t e M e t h o d s */ /******************************************************************************/ /******************************************************************************/ /* e N a m e */ /******************************************************************************/ const char *XrdOfsEvs::eName(int eNum) { static const char *eventName[] = {"Chmod", "closer", "closew", "create", "fwrite", "mkdir", "mv", "openr", "opnw", "rm", "rmdir", "trunc"}; eNum = (eNum & Mask); return (eNum < 0 || eNum >= nCount ? "?" : eventName[eNum]); } /******************************************************************************/ /* F e e d */ /******************************************************************************/ int XrdOfsEvs::Feed(const char *data, int dlen) { int retc; // Write the data. ince this is a udp socket all the data goes or none does // do { retc = write(msgFD, (const void *)data, (size_t)dlen);} while (retc < 0 && errno == EINTR); if (retc < 0) {eDest->Emsg("EvsFeed", errno, "write to event socket", theTarget); return -1; } // All done // return 0; } /******************************************************************************/ /* g e t M s g */ /******************************************************************************/ XrdOfsEvsMsg *XrdOfsEvs::getMsg(int bigmsg) { XrdOfsEvsMsg *tp; int msz = 0; // Lock the free queue // fMut.Lock(); // Get a free element from the big or small queue, as needed // if (bigmsg) if ((tp = msgFreeMax)) msgFreeMax = tp->next; else msz = maxMsgSize; else if ((tp = msgFreeMin)) msgFreeMin = tp->next; else msz = minMsgSize; // Check if we have to allocate a new item // if (!tp && (numMax + numMin) < (maxMax + maxMin)) {if ((tp = new XrdOfsEvsMsg((char *)malloc(msz), bigmsg))) {if (!(tp->text)) {delete tp; tp = 0;} else if (bigmsg) numMax++; else numMin++; } } // Unlock and return result // fMut.UnLock(); return tp; } /******************************************************************************/ /* r e t M s g */ /******************************************************************************/ void XrdOfsEvs::retMsg(XrdOfsEvsMsg *tp) { // Lock the free queue // fMut.Lock(); // Check if we exceeded the hold quotax // if (tp->isBig) if (numMax > maxMax) {delete tp; numMax--;} else {tp->next = msgFreeMax; msgFreeMax = tp;} else if (numMin > maxMin) {delete tp; numMin--;} else {tp->next = msgFreeMin; msgFreeMin = tp;} // Unlock and return // fMut.UnLock(); }