/******************************************************************************/
/* */
/* X r d B w m L o g g e r . c c */
/* */
/* (c) 2008 by the Board of Trustees of the Leland Stanford, Jr., University */
/* All Rights Reserved */
/* Produced by Andrew Hanushevsky for Stanford University under contract */
/* DE-AC02-76-SFO0515 with the Department of Energy */
/* */
/* This file is part of the XRootD software suite. */
/* */
/* XRootD is free software: you can redistribute it and/or modify it under */
/* the terms of the GNU Lesser General Public License as published by the */
/* Free Software Foundation, either version 3 of the License, or (at your */
/* option) any later version. */
/* */
/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
/* License for more details. */
/* */
/* You should have received a copy of the GNU Lesser General Public License */
/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
/* COPYING (GPL license). If not, see . */
/* */
/* The copyright holder's institutional names and contributor's names may not */
/* be used to endorse or promote products derived from this software without */
/* specific prior written permission of the institution or contributor. */
/******************************************************************************/
#include
#include
#include
#include
#include
#include "XrdBwm/XrdBwmLogger.hh"
#include "XrdSys/XrdSysError.hh"
#include "XrdOuc/XrdOucProg.hh"
#include "XrdOuc/XrdOucStream.hh"
#include "XrdNet/XrdNetOpts.hh"
#include "XrdNet/XrdNetSocket.hh"
#include "XrdSys/XrdSysPlatform.hh"
/******************************************************************************/
/* L o c a l C l a s s e s */
/******************************************************************************/
class XrdBwmLoggerMsg
{
public:
static const int msgSize = 2048;
XrdBwmLoggerMsg *next;
char Text[msgSize];
int Tlen;
XrdBwmLoggerMsg() : next(0), Tlen(0) {}
~XrdBwmLoggerMsg() {}
};
/******************************************************************************/
/* E x t e r n a l L i n k a g e s */
/******************************************************************************/
void *XrdBwmLoggerSend(void *pp)
{
XrdBwmLogger *lP = (XrdBwmLogger *)pp;
lP->sendEvents();
return (void *)0;
}
/******************************************************************************/
/* C o n s t r u c t o r */
/******************************************************************************/
XrdBwmLogger::XrdBwmLogger(const char *Target)
{
// Set common variables
//
theTarget = strdup(Target);
eDest = 0;
theProg = 0;
msgFirst = msgLast = msgFree = 0;
tid = 0;
msgFD = 0;
endIT = 0;
theEOL= '\n';
msgsInQ = 0;
}
/******************************************************************************/
/* D e s t r u c t o r */
/******************************************************************************/
XrdBwmLogger::~XrdBwmLogger()
{
XrdBwmLoggerMsg *tp;
// Kill the notification thread. This may cause a msg block to be orphaned
// but, in practice, this object does not really get deleted after being
// started. So, the problem is moot.
//
endIT = 1;
if (tid) XrdSysThread::Kill(tid);
// Release all queued message bocks
//
qMut.Lock();
while ((tp = msgFirst)) {msgFirst = tp->next; delete tp;}
if (theTarget) free(theTarget);
if (msgFD >= 0) close(msgFD);
if (theProg) delete theProg;
qMut.UnLock();
// Release all free message blocks
//
fMut.Lock();
while ((tp = msgFree)) {msgFree = tp->next; delete tp;}
fMut.UnLock();
}
/******************************************************************************/
/* E v e n t */
/******************************************************************************/
void XrdBwmLogger::Event(Info &eInfo)
{
static int warnings = 0;
XrdBwmLoggerMsg *tp;
// Get a message block
//
if (!(tp = getMsg()))
{if ((++warnings & 0xff) == 1)
eDest->Emsg("Notify", "Ran out of logger message objects;",
eInfo.Tident, "event not logged.");
return;
}
// Format the message
//
tp->Tlen = snprintf(tp->Text, XrdBwmLoggerMsg::msgSize,
"%s%s"
"%s%s%c"
"%ld%ld%ld"
"%d%d%d"
"%lld%d%c",
eInfo.Tident, eInfo.Lfn, eInfo.lclNode, eInfo.rmtNode,
eInfo.Flow, eInfo.ATime, eInfo.BTime, eInfo.CTime,
eInfo.numqIn, eInfo.numqOut, eInfo.numqXeq, eInfo.Size,
eInfo.ESec, theEOL);
// Either log this or put the message on the queue and return
//
tp->next = 0;
qMut.Lock();
if (msgLast) {msgLast->next = tp; msgLast = tp;}
else msgFirst = msgLast = tp;
qMut.UnLock();
qSem.Post();
}
/******************************************************************************/
/* s e n d E v e n t s */
/******************************************************************************/
void XrdBwmLogger::sendEvents(void)
{
XrdBwmLoggerMsg *tp;
const char *theData[2] = {0,0};
int theDlen[2] = {0,0};
// This is an endless loop that just gets things off the event queue and
// send them out. This allows us to only hang a simgle thread should the
// receiver get blocked, instead of the whole process.
//
while(1)
{qSem.Wait();
qMut.Lock();
if (endIT) break;
if ((tp = msgFirst) && !(msgFirst = tp->next)) msgLast = 0;
qMut.UnLock();
if (tp)
{if (!theProg) Feed(tp->Text, tp->Tlen);
else {theData[0] = tp->Text; theDlen[0] = tp->Tlen;
theProg->Feed(theData, theDlen);
}
retMsg(tp);
}
}
qMut.UnLock();
}
/******************************************************************************/
/* S t a r t */
/******************************************************************************/
int XrdBwmLogger::Start(XrdSysError *eobj)
{
int rc;
// Set the error object pointer
//
eDest = eobj;
// Check if we need to create a socket to a path
//
if (!strcmp("*", theTarget)) {msgFD = -1; theEOL = '\0';}
else if (*theTarget == '>')
{XrdNetSocket *msgSock;
if (!(msgSock = XrdNetSocket::Create(eobj, theTarget+1, 0, 0660,
XRDNET_FIFO))) return -1;
msgFD = msgSock->Detach();
delete msgSock;
}
else {// Allocate a new program object if we don't have one
//
if (theProg) return 0;
theProg = new XrdOucProg(eobj);
// Setup the program
//
if (theProg->Setup(theTarget, eobj)) return -1;
if ((rc = theProg->Start()))
{eobj->Emsg("Logger", rc, "start event collector"); return -1;}
}
// Now start a thread to get messages and send them to the collector
//
if ((rc = XrdSysThread::Run(&tid, XrdBwmLoggerSend, static_cast(this),
0, "Log message sender")))
{eobj->Emsg("Logger", rc, "create log message sender thread");
return -1;
}
// All done
//
return 0;
}
/******************************************************************************/
/* P r i v a t e M e t h o d s */
/******************************************************************************/
/******************************************************************************/
/* F e e d */
/******************************************************************************/
int XrdBwmLogger::Feed(const char *data, int dlen)
{
int retc;
// Send message to the log if need be
//
if (msgFD < 0) {eDest->Say("", data); return 0;}
// Write the data. since this is a udp socket all the data goes or none does
//
do { retc = write(msgFD, (const void *)data, (size_t)dlen);}
while (retc < 0 && errno == EINTR);
if (retc < 0)
{eDest->Emsg("Feed", errno, "write to logger socket", theTarget);
return -1;
}
// All done
//
return 0;
}
/******************************************************************************/
/* g e t M s g */
/******************************************************************************/
XrdBwmLoggerMsg *XrdBwmLogger::getMsg()
{
XrdBwmLoggerMsg *tp;
// Lock the free queue
//
fMut.Lock();
// Get message object but don't give out too many
//
if (msgsInQ >= maxmInQ) tp = 0;
else {if ((tp = msgFree)) msgFree = tp->next;
else tp = new XrdBwmLoggerMsg();
msgsInQ++;
}
// Unlock and return result
//
fMut.UnLock();
return tp;
}
/******************************************************************************/
/* r e t M s g */
/******************************************************************************/
void XrdBwmLogger::retMsg(XrdBwmLoggerMsg *tp)
{
// Lock the free queue, return message, unlock the queue
//
fMut.Lock();
tp->next = msgFree;
msgFree = tp;
msgsInQ--;
fMut.UnLock();
}