/******************************************************************************/
/* */
/* X r d X r o o t d M o n i t o r . c c */
/* */
/* (c) 2004 by the Board of Trustees of the Leland Stanford, Jr., University */
/* All Rights Reserved */
/* Produced by Andrew Hanushevsky for Stanford University under contract */
/* DE-AC02-76-SFO0515 with the Department of Energy */
/* */
/* This file is part of the XRootD software suite. */
/* */
/* XRootD is free software: you can redistribute it and/or modify it under */
/* the terms of the GNU Lesser General Public License as published by the */
/* Free Software Foundation, either version 3 of the License, or (at your */
/* option) any later version. */
/* */
/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
/* License for more details. */
/* */
/* You should have received a copy of the GNU Lesser General Public License */
/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
/* COPYING (GPL license). If not, see . */
/* */
/* The copyright holder's institutional names and contributor's names may not */
/* be used to endorse or promote products derived from this software without */
/* specific prior written permission of the institution or contributor. */
/******************************************************************************/
#include
#include
#include
#include
#include
#include
#if !defined(__APPLE__) && !defined(__FreeBSD__)
#include
#endif
#include "XrdVersion.hh"
#include "XrdNet/XrdNetMsg.hh"
#include "XrdOuc/XrdOucEnv.hh"
#include "XrdOuc/XrdOucUtils.hh"
#include "XrdSys/XrdSysError.hh"
#include "XrdSys/XrdSysPlatform.hh"
#include "Xrd/XrdScheduler.hh"
#include "XrdXrootd/XrdXrootdMonitor.hh"
#include "XrdXrootd/XrdXrootdMonFile.hh"
#include "XrdXrootd/XrdXrootdTrace.hh"
/******************************************************************************/
/* S t a t i c A l l o c a t i o n */
/******************************************************************************/
XrdScheduler *XrdXrootdMonitor::Sched = 0;
XrdSysError *XrdXrootdMonitor::eDest = 0;
char *XrdXrootdMonitor::idRec = 0;
int XrdXrootdMonitor::idLen = 0;
char *XrdXrootdMonitor::Dest1 = 0;
int XrdXrootdMonitor::monMode1 = 0;
XrdNetMsg *XrdXrootdMonitor::InetDest1 = 0;
char *XrdXrootdMonitor::Dest2 = 0;
int XrdXrootdMonitor::monMode2 = 0;
XrdNetMsg *XrdXrootdMonitor::InetDest2 = 0;
XrdXrootdMonitor *XrdXrootdMonitor::altMon = 0;
XrdSysMutex XrdXrootdMonitor::windowMutex;
kXR_int32 XrdXrootdMonitor::startTime = 0;
int XrdXrootdMonitor::monRlen = 0;
XrdXrootdMonitor::MonRdrBuff
XrdXrootdMonitor::rdrMon[XrdXrootdMonitor::rdrMax];
XrdXrootdMonitor::MonRdrBuff
*XrdXrootdMonitor::rdrMP = 0;
XrdSysMutex XrdXrootdMonitor::rdrMutex;
int XrdXrootdMonitor::monBlen = 0;
int XrdXrootdMonitor::lastEnt = 0;
int XrdXrootdMonitor::lastRnt = 0;
int XrdXrootdMonitor::isEnabled = 0;
int XrdXrootdMonitor::numMonitor = 0;
int XrdXrootdMonitor::autoFlash = 0;
int XrdXrootdMonitor::autoFlush = 600;
int XrdXrootdMonitor::FlushTime = 0;
int XrdXrootdMonitor::monIdent = 3600;
kXR_int32 XrdXrootdMonitor::currWindow = 0;
int XrdXrootdMonitor::rdrTOD = 0;
int XrdXrootdMonitor::rdrWin = 0;
int XrdXrootdMonitor::rdrNum = 3;
kXR_int32 XrdXrootdMonitor::sizeWindow = 60;
char XrdXrootdMonitor::sidName[16]= {0};
short XrdXrootdMonitor::sidSize = 0;
char XrdXrootdMonitor::monINFO = 0;
char XrdXrootdMonitor::monIO = 0;
char XrdXrootdMonitor::monFILE = 0;
char XrdXrootdMonitor::monREDR = 0;
char XrdXrootdMonitor::monUSER = 0;
char XrdXrootdMonitor::monAUTH = 0;
char XrdXrootdMonitor::monACTIVE = 0;
char XrdXrootdMonitor::monFSTAT = 0;
char XrdXrootdMonitor::monCLOCK = 0;
/******************************************************************************/
/* G l o b a l s */
/******************************************************************************/
extern XrdOucTrace *XrdXrootdTrace;
namespace XrdXrootdMonInfo
{
long long mySID = 0;
}
using namespace XrdXrootdMonInfo;
/******************************************************************************/
/* L o c a l D e f i n e s */
/******************************************************************************/
#define setTMark(TM_mb, TM_en, TM_tm) \
TM_mb->info[TM_en].arg0.val = mySID; \
TM_mb->info[TM_en].arg0.id[0] = XROOTD_MON_WINDOW; \
TM_mb->info[TM_en].arg1.Window = \
TM_mb->info[TM_en].arg2.Window = static_cast(ntohl(TM_tm));
#define setTMurk(TM_mb, TM_en, TM_tm) \
TM_mb->info[TM_en].arg0.Window = rdrWin; \
TM_mb->info[TM_en].arg1.Window = static_cast(TM_tm);
/******************************************************************************/
/* L o c a l C l a s s e s */
/******************************************************************************/
/******************************************************************************/
/* X r d X r o o t d M o n i t o r _ I d e n t */
/******************************************************************************/
class XrdXrootdMonitor_Ident : public XrdJob
{
public:
void DoIt() {
XrdXrootdMonitor::Ident();
Sched->Schedule((XrdJob *)this, time(0)+idInt);
}
XrdXrootdMonitor_Ident(XrdScheduler *sP, int idt)
: XrdJob("monitor ident"), Sched(sP), idInt(idt) {}
~XrdXrootdMonitor_Ident() {}
private:
XrdScheduler *Sched; // System scheduler
int idInt;
};
/******************************************************************************/
/* C l a s s X r d X r o o t d M o n i t o r _ T i c k */
/******************************************************************************/
class XrdXrootdMonitor_Tick : public XrdJob
{
public:
void DoIt() {
#ifndef NODEBUG
const char *TraceID = "MonTick";
#endif
time_t Now = XrdXrootdMonitor::Tick();
if (Window && Now)
Sched->Schedule((XrdJob *)this, Now+Window);
else {TRACE(DEBUG, "Monitor clock stopping.");}
}
void Set(XrdScheduler *sp, int intvl) {Sched = sp; Window = intvl;}
XrdXrootdMonitor_Tick() : XrdJob("monitor window clock"),
Sched(0), Window(0) {}
~XrdXrootdMonitor_Tick() {}
private:
XrdScheduler *Sched; // System scheduler
int Window;
};
/******************************************************************************/
/* C l a s s X r d X r o o t d M o n i t o r L o c k */
/******************************************************************************/
class XrdXrootdMonitorLock
{
public:
static void Lock() {monLock.Lock();}
static void UnLock() {monLock.UnLock();}
XrdXrootdMonitorLock(XrdXrootdMonitor *theMonitor)
{if (theMonitor != XrdXrootdMonitor::altMon) unLock = 0;
else {unLock = 1; monLock.Lock();}
}
~XrdXrootdMonitorLock() {if (unLock) monLock.UnLock();}
private:
static XrdSysMutex monLock;
char unLock;
};
XrdSysMutex XrdXrootdMonitorLock::monLock;
/******************************************************************************/
/* X r d X r o o t d M o n i t o r : : U s e r : : D i s a b l e */
/******************************************************************************/
void XrdXrootdMonitor::User::Disable()
{
if (Agent)
{XrdXrootdMonitor::unAlloc(Agent); Agent = 0;}
Fops = Iops = 0;
}
/******************************************************************************/
/* X r d X r o o t d M o n i t o r : : U s e r : : E n a b l e */
/******************************************************************************/
void XrdXrootdMonitor::User::Enable()
{
if (Agent || (Agent = XrdXrootdMonitor::Alloc(1)))
{Iops = XrdXrootdMonitor::monIO;
Fops = XrdXrootdMonitor::monFILE;
} else Iops = Fops = 0;
}
/******************************************************************************/
/* X r d X r o o t d M o n i t o r : : U s e r : : R e g i s t e r */
/******************************************************************************/
void XrdXrootdMonitor::User::Register(const char *Uname,
const char *Hname,
const char *Pname)
{
const char *colonP, *atP;
char uBuff[1024], *uBP;
int n;
// The identification always starts with the protocol being used
//
n = sprintf(uBuff, "%s/", Pname);
uBP = uBuff + n;
// Decode the user name as a.b:c@d
//
if ((colonP = index(Uname, ':')) && (atP = index(colonP+1, '@')))
{n = colonP - Uname + 1;
strncpy(uBP, Uname, n);
strcpy(uBP+n, sidName);
n += sidSize; *(uBP+n) = '@'; n++;
strcpy(uBP+n, Hname);
} else strcpy(uBP, Uname);
// Generate a monitor identity for this user. We do not assign a dictioary
// identifier unless this entry is reported.
//
Agent = XrdXrootdMonitor::Alloc();
Did = 0;
Len = strlen(uBuff);
Name = strdup(uBuff);
Iops = XrdXrootdMonitor::monIO;
Fops = XrdXrootdMonitor::monFILE;
}
/******************************************************************************/
/* C o n s t r u c t o r */
/******************************************************************************/
XrdXrootdMonitor::XrdXrootdMonitor()
{
kXR_int32 localWindow;
// Initialize last window to force a mark as well as the local window
//
lastWindow = 0;
localWindow = currWindow;
// Allocate a monitor buffer
//
if (!(monBuff = (XrdXrootdMonBuff *)memalign(getpagesize(), monBlen)))
eDest->Emsg("Monitor", "Unable to allocate monitor buffer.");
else {nextEnt = 1;
setTMark(monBuff, 0, localWindow);
}
}
/******************************************************************************/
/* D e s t r u c t o r */
/******************************************************************************/
XrdXrootdMonitor::~XrdXrootdMonitor()
{
// Release buffer
if (monBuff) {Flush(); free(monBuff);}
}
/******************************************************************************/
/* a p p I D */
/******************************************************************************/
void XrdXrootdMonitor::appID(char *id)
{
static const int apInfoSize = sizeof(XrdXrootdMonTrace)-4;
// Application ID's are only meaningful for io event recording
//
if (this == altMon || !*id) return;
// Fill out the monitor record
//
if (lastWindow != currWindow) Mark();
else if (nextEnt == lastEnt) Flush();
monBuff->info[nextEnt].arg0.id[0] = XROOTD_MON_APPID;
strncpy((char *)(&(monBuff->info[nextEnt])+4), id, apInfoSize);
}
/******************************************************************************/
/* A l l o c */
/******************************************************************************/
XrdXrootdMonitor *XrdXrootdMonitor::Alloc(int force)
{
XrdXrootdMonitor *mp;
int lastVal;
// If enabled, create a new object (if possible). If we are not monitoring
// i/o then return the global object.
//
if (!isEnabled || (isEnabled < 0 && !force)) mp = 0;
else if (!monIO) mp = altMon;
else if ((mp = new XrdXrootdMonitor()))
if (!(mp->monBuff)) {delete mp; mp = 0;}
// Check if we should turn on the monitor clock
//
if (mp && isEnabled < 0)
{windowMutex.Lock();
lastVal = numMonitor; numMonitor++;
if (!lastVal && !monREDR) startClock();
windowMutex.UnLock();
}
// All done
//
return mp;
}
/******************************************************************************/
/* C l o s e */
/******************************************************************************/
void XrdXrootdMonitor::Close(kXR_unt32 dictid, long long rTot, long long wTot)
{
XrdXrootdMonitorLock mLock(this);
unsigned int rVal, wVal;
// Fill out the monitor record (we allow the compiler to correctly cast data)
//
if (lastWindow != currWindow) Mark();
else if (nextEnt == lastEnt) Flush();
monBuff->info[nextEnt].arg0.id[0] = XROOTD_MON_CLOSE;
monBuff->info[nextEnt].arg0.id[1] = do_Shift(rTot, rVal);
monBuff->info[nextEnt].arg0.rTot[1] = htonl(rVal);
monBuff->info[nextEnt].arg0.id[2] = do_Shift(wTot, wVal);
monBuff->info[nextEnt].arg0.id[3] = 0;
monBuff->info[nextEnt].arg1.wTot = htonl(wVal);
monBuff->info[nextEnt++].arg2.dictid = dictid;
// Check if we need to duplicate this entry
//
if (altMon && this != altMon) altMon->Dup(&monBuff->info[nextEnt-1]);
}
/******************************************************************************/
/* D e f a u l t s */
/******************************************************************************/
// This version must be called after the subsequent version!
void XrdXrootdMonitor::Defaults(char *dest1, int mode1, char *dest2, int mode2)
{
int mmode;
// Make sure if we have a dest1 we have mode
//
if (!dest1)
{mode1 = (dest1 = dest2) ? mode2 : 0;
dest2 = 0; mode2 = 0;
} else if (!dest2) mode2 = 0;
// Set the default destinations (caller supplied strdup'd strings)
//
if (Dest1) free(Dest1);
Dest1 = dest1; monMode1 = mode1;
if (Dest2) free(Dest2);
Dest2 = dest2; monMode2 = mode2;
// Set overall monitor mode
//
mmode = mode1 | mode2;
monACTIVE = (mmode ? 1 : 0);
isEnabled = (mmode & XROOTD_MON_ALL ? 1 :-1);
monIO = (mmode & XROOTD_MON_IO ? 1 : 0);
monIO = (mmode & XROOTD_MON_IOV ? 2 : monIO);
monINFO = (mmode & XROOTD_MON_INFO ? 1 : 0);
monFILE = (mmode & XROOTD_MON_FILE ? 1 : 0) | monIO;
monREDR = (mmode & XROOTD_MON_REDR ? 1 : 0);
monUSER = (mmode & XROOTD_MON_USER ? 1 : 0);
monAUTH = (mmode & XROOTD_MON_AUTH ? 1 : 0);
monFSTAT = (mmode & XROOTD_MON_FSTA && monFSTAT ? 1 : 0);
// Compute whether or not we need the clock running
//
if (monREDR || (isEnabled > 0 && (monIO || monFILE))) monCLOCK = 1;
// Check where user information should go
//
if (((mode1 & XROOTD_MON_IO) && (mode1 & XROOTD_MON_USER))
|| ((mode2 & XROOTD_MON_IO) && (mode2 & XROOTD_MON_USER)))
{if ((!(mode1 & XROOTD_MON_IO) && (mode1 & XROOTD_MON_USER))
|| (!(mode2 & XROOTD_MON_IO) && (mode2 & XROOTD_MON_USER))) monUSER = 3;
else monUSER = 2;
}
// If we are monitoring redirections then set an envar saying how often idents
// should be sent (this also tips off other layers to handle such monitoring)
//
if (monREDR) XrdOucEnv::Export("XRDMONRDR", monIdent);
// Do final check
//
if (Dest1 == 0 && Dest2 == 0) isEnabled = 0;
}
/******************************************************************************/
void XrdXrootdMonitor::Defaults(int msz, int rsz, int wsz,
int flush, int flash, int idt, int rnm,
int fsint, int fsopt, int fsion)
{
// Set default window size and flush time
//
sizeWindow = (wsz <= 0 ? 60 : wsz);
autoFlush = (flush <= 0 ? 600 : flush);
autoFlash = (flash <= 0 ? 0 : flash);
monIdent = (idt < 0 ? 0 : idt);
rdrNum = (rnm <= 0 || rnm > rdrMax ? 3 : rnm);
rdrWin = (sizeWindow > 16777215 ? 16777215 : sizeWindow);
rdrWin = htonl(rdrWin);
// Set the fstat defaults
//
XrdXrootdMonFile::Defaults(fsint, fsopt, fsion);
monFSTAT = fsint != 0;
// Set default monitor buffer size
//
if (msz <= 0) msz = 16384;
else if (msz < 1024) msz = 1024;
else msz = msz/sizeof(XrdXrootdMonTrace)*sizeof(XrdXrootdMonTrace);
lastEnt = (msz-sizeof(XrdXrootdMonHeader))/sizeof(XrdXrootdMonTrace);
monBlen = (lastEnt*sizeof(XrdXrootdMonTrace))+sizeof(XrdXrootdMonHeader);
lastEnt--;
// Set default monitor redirect buffer size
//
if (rsz <= 0) rsz = 32768;
else if (rsz < 2048) rsz = 2048;
lastRnt = (rsz-(sizeof(XrdXrootdMonHeader) + 16))/sizeof(XrdXrootdMonRedir);
monRlen = (lastRnt*sizeof(XrdXrootdMonRedir))+sizeof(XrdXrootdMonHeader)+16;
lastRnt--;
}
/******************************************************************************/
/* D i s c */
/******************************************************************************/
void XrdXrootdMonitor::Disc(kXR_unt32 dictid, int csec, char Flags)
{
XrdXrootdMonitorLock mLock(this);
// Check if this should not be included in the io trace
//
if (this != altMon && monUSER == 1 && altMon)
{altMon->Disc(dictid, csec); return;}
// Fill out the monitor record (let compiler cast the data correctly)
//
if (lastWindow != currWindow) Mark();
else if (nextEnt == lastEnt) Flush();
monBuff->info[nextEnt].arg0.rTot[0] = 0;
monBuff->info[nextEnt].arg0.id[0] = XROOTD_MON_DISC;
monBuff->info[nextEnt].arg0.id[1] = Flags;
monBuff->info[nextEnt].arg1.wTot = htonl(csec);
monBuff->info[nextEnt++].arg2.dictid = dictid;
// Check if we need to duplicate this entry
//
if (altMon && this != altMon && monUSER == 3)
altMon->Dup(&monBuff->info[nextEnt-1]);
}
/******************************************************************************/
/* D u p */
/******************************************************************************/
void XrdXrootdMonitor::Dup(XrdXrootdMonTrace *mrec)
{
XrdXrootdMonitorLock mLock(this);
// Fill out the monitor record
//
if (lastWindow != currWindow) Mark();
else if (nextEnt == lastEnt) Flush();
memcpy(&monBuff->info[nextEnt],(const void *)mrec,sizeof(XrdXrootdMonTrace));
nextEnt++;
}
/******************************************************************************/
/* Private: F e t c h */
/******************************************************************************/
XrdXrootdMonitor::MonRdrBuff *XrdXrootdMonitor::Fetch()
{
MonRdrBuff *bP;
// Get the next available stream and promote another one
//
rdrMutex.Lock();
if ((bP = rdrMP)) rdrMP = rdrMP->Next;
rdrMutex.UnLock();
return bP;
}
/******************************************************************************/
/* I n i t */
/******************************************************************************/
int XrdXrootdMonitor::Init(XrdScheduler *sp, XrdSysError *errp,
const char *iHost, const char *iProg,
const char *iName, int Port)
{
static XrdXrootdMonitor_Ident MonIdent(sp, monIdent);
XrdXrootdMonMap *mP;
char iBuff[1024], iPuff[1024], *sName, *cP;
int i, Now = time(0);
bool aOK;
// Set static variables
//
Sched = sp;
eDest = errp;
startTime = htonl(Now);
// Generate our server ID
//
strcpy(iBuff, "=/");
sprintf(iPuff, "%s&ver=%s", iProg, XrdVERSION);
sName = XrdOucUtils::Ident(mySID, iBuff+2, sizeof(iBuff)-2,
iHost, iPuff, iName, Port);
cP = (char *)&mySID; *cP = 0; *(cP+1) = 0;
sidSize = strlen(sName);
if (sidSize >= (int)sizeof(sidName)) sName[sizeof(sidName)-1] = 0;
strcpy(sidName, sName);
free(sName);
// There is nothing to do unless we have been enabled via Defaults()
//
if (!isEnabled) return 1;
// Setup the primary destination
//
InetDest1 = new XrdNetMsg(eDest, Dest1, &aOK);
if (!aOK)
{eDest->Emsg("Monitor", "Unable to setup primary monitor collector.");
return 0;
}
// Setup the secondary destination
//
if (Dest2)
{InetDest2 = new XrdNetMsg(eDest, Dest2, &aOK);
if (!aOK)
{eDest->Emsg("Monitor","Unable to setup secondary monitor collector.");
return 0;
}
}
// If there is a destination that is only collecting file events, then
// allocate a global monitor object but don't start the timer just yet.
//
if ((monMode1 && !(monMode1 & XROOTD_MON_IO))
|| (monMode2 && !(monMode2 & XROOTD_MON_IO)))
if (!(altMon = new XrdXrootdMonitor()) || !altMon->monBuff)
{if (altMon) {delete altMon; altMon = 0;}
eDest->Emsg("Monitor","allocate monitor; insufficient storage.");
return 0;
}
// Turn on the monitoring clock if we need it running all the time
//
if (monCLOCK) startClock();
// Create identification record
//
idLen = strlen(iBuff) + sizeof(XrdXrootdMonHeader) + sizeof(kXR_int32);
idRec = (char *)malloc(idLen+1);
mP = (XrdXrootdMonMap *)idRec;
fillHeader(&(mP->hdr), XROOTD_MON_MAPIDNT, idLen);
mP->hdr.pseq = 0;
mP->dictid = 0;
strcpy(mP->info, iBuff);
// Now schedule the first identification record
//
if (Sched && monIdent) Sched->Schedule((XrdJob *)&MonIdent);
// If we are monitoring file stats then start that up
//
if (!Sched || !monFSTAT) monFSTAT = 0;
else if (!XrdXrootdMonFile::Init(Sched, eDest)) return 0;
// If we are not monitoring redirections, we are done!
//
if (!monREDR) return 1;
// Allocate as many redirection monitors as requested
//
for (i = 0; i < rdrNum; i++)
{rdrMon[i].Buff = (XrdXrootdMonBurr *)memalign(getpagesize(),monRlen);
if (!rdrMon[i].Buff)
{eDest->Emsg("Monitor", "Unable to allocate monitor rdr buffer.");
return 0;
}
rdrMon[i].Buff->sID = mySID;
rdrMon[i].Buff->sXX[0] = XROOTD_MON_REDSID;
rdrMon[i].Next = (i ? &rdrMon[i-1] : &rdrMon[0]);
rdrMon[i].nextEnt = 0;
rdrMon[i].flushIt = Now + autoFlush;
rdrMon[i].lastTOD = 0;
}
rdrMon[0].Next = &rdrMon[i-1];
rdrMP = &rdrMon[0];
// All done
//
return 1;
}
/******************************************************************************/
/* Private: G e t D i c t I D */
/******************************************************************************/
kXR_unt32 XrdXrootdMonitor::GetDictID()
{
static XrdSysMutex seqMutex;
static unsigned int monSeqID = 1;
unsigned int mySeqID;
// Assign a unique ID for this entry
//
seqMutex.Lock();
mySeqID = monSeqID++;
seqMutex.UnLock();
// Return the ID
//
return htonl(mySeqID);
}
/******************************************************************************/
/* Private: M a p */
/******************************************************************************/
kXR_unt32 XrdXrootdMonitor::Map(char code, XrdXrootdMonitor::User &uInfo,
const char *path)
{
XrdXrootdMonMap map;
int size, montype;
// Copy in the username and path
//
map.dictid = GetDictID();
strcpy(map.info, uInfo.Name);
size = uInfo.Len;
if (path)
{*(map.info+size) = '\n';
strlcpy(map.info+size+1, path, sizeof(map.info)-size-1);
size = size + strlen(path) + 1;
}
// Fill in the header
//
size = sizeof(XrdXrootdMonHeader)+sizeof(kXR_int32)+size;
fillHeader(&map.hdr, code, size);
// Route the packet to all destinations that need them
//
if (code == XROOTD_MON_MAPPATH) montype = XROOTD_MON_PATH;
else if (code == XROOTD_MON_MAPUSER) montype = XROOTD_MON_USER;
else montype = XROOTD_MON_INFO;
Send(montype, (void *)&map, size);
// Return the dictionary id
//
return map.dictid;
}
/******************************************************************************/
/* O p e n */
/******************************************************************************/
void XrdXrootdMonitor::Open(kXR_unt32 dictid, off_t fsize)
{
XrdXrootdMonitorLock mLock(this);
if (lastWindow != currWindow) Mark();
else if (nextEnt == lastEnt) Flush();
h2nll(fsize, monBuff->info[nextEnt].arg0.val);
monBuff->info[nextEnt].arg0.id[0] = XROOTD_MON_OPEN;
monBuff->info[nextEnt].arg1.buflen = 0;
monBuff->info[nextEnt++].arg2.dictid = dictid;
// Check if we need to duplicate this entry
//
if (altMon && this != altMon) altMon->Dup(&monBuff->info[nextEnt-1]);
}
/******************************************************************************/
/* R e d i r e c t */
/******************************************************************************/
int XrdXrootdMonitor::Redirect(kXR_unt32 mID, const char *hName, int Port,
char opC, const char *Path)
{
XrdXrootdMonRedir *mtP;
MonRdrBuff *mP = Fetch();
int n, slots, hLen, pLen;
char *dest;
// Take care of the server's name which might actually be a path
//
if (*hName == '/') {Path = hName; hName = ""; hLen = 0;}
else {const char *quest = index(hName, '?');
hLen = (quest ? quest - hName : strlen(hName));
if (hLen > 256) hLen = 256;
}
// Take care of the path
//
pLen = strlen(Path);
if (pLen > 1024) pLen = 1024;
// Compute number of entries needed here
//
n = (hLen + 1 + pLen + 1); // ":\0"
slots = n / sizeof(XrdXrootdMonRedir);
if (n % sizeof(XrdXrootdMonRedir)) slots++;
pLen = slots * sizeof(XrdXrootdMonRedir) - (hLen+1);
// Obtain a lock on this buffer
//
if (!mP) return 0;
mP->Mutex.Lock();
// If we don't have enough slots, flush this buffer. Note that we account for
// the ending timing mark here (an extra slot).
//
if (mP->nextEnt + slots + 2 >= lastRnt) Flush(mP);
// Check if we need a timing mark
//
if (mP->lastTOD != rdrTOD)
{mP->lastTOD = rdrTOD;
setTMurk(mP->Buff, mP->nextEnt, mP->lastTOD);
mP->nextEnt++;
}
// Fill out the buffer
//
mtP = &(mP->Buff->info[mP->nextEnt]);
mtP->arg0.rdr.Type = XROOTD_MON_REDIRECT | opC;
mtP->arg0.rdr.Dent = static_cast(slots);
mtP->arg0.rdr.Port = htons(static_cast(Port));
mtP->arg1.dictid = mID;
dest = (char *)(mtP+1);
strncpy(dest, hName,hLen); dest += hLen; *dest++ = ':';
strncpy(dest, Path, pLen);
// Adjust pointer and return
//
mP->nextEnt = mP->nextEnt + (slots+1);
mP->Mutex.UnLock();
return 0;
}
/******************************************************************************/
/* T i c k */
/******************************************************************************/
time_t XrdXrootdMonitor::Tick()
{
time_t Now = time(0);
int nextFlush;
// We can safely set the window as we are the only ones doing so and memory
// access is atomic as long as it sits within a cache line (which it does).
//
currWindow = static_cast(Now);
rdrTOD = htonl(currWindow);
nextFlush = currWindow + autoFlush;
// Check to see if we should flush the alternate monitor
//
if (altMon && currWindow >= FlushTime)
{XrdXrootdMonitorLock::Lock();
if (currWindow >= FlushTime)
{if (altMon->nextEnt > 1) altMon->Flush();
else FlushTime = nextFlush;
}
XrdXrootdMonitorLock::UnLock();
}
// Now check to see if we need to flush redirect buffers
//
if (monREDR)
{int n = rdrNum;
while(n--)
{rdrMon[n].Mutex.Lock();
if (rdrMon[n].nextEnt == 0) rdrMon[n].flushIt = nextFlush;
else if (rdrMon[n].flushIt <= currWindow) Flush(&rdrMon[n]);
rdrMon[n].Mutex.UnLock();
}
}
// All done. Stop the clock if there is no reason for it to be running. The
// clock always runs if we are monitoring redirects or all clients. Otherwise,
// the clock only runs if we have a one or more client-specific monitors.
//
if (!monREDR && isEnabled < 0)
{windowMutex.Lock();
if (!numMonitor) Now = 0;
windowMutex.UnLock();
}
return Now;
}
/******************************************************************************/
/* u n A l l o c */
/******************************************************************************/
void XrdXrootdMonitor::unAlloc(XrdXrootdMonitor *monp)
{
// We must delete this object if we are de-allocating the local monitor.
//
if (monp != altMon) delete monp;
// Decrease number being monitored if in selective mode
//
if (isEnabled < 0)
{windowMutex.Lock();
numMonitor--;
windowMutex.UnLock();
}
}
/******************************************************************************/
/* P r i v a t e M e t h o d s */
/******************************************************************************/
/******************************************************************************/
/* d o _ S h i f t */
/******************************************************************************/
unsigned char XrdXrootdMonitor::do_Shift(long long xTot, unsigned int &xVal)
{
const long long smask = 0x7fffffff00000000LL;
const long long xmask = 0x7fffffffffffffffLL;
unsigned char xshift = 0;
xTot &= xmask;
while(xTot & smask) {xTot = xTot >> 1LL; xshift++;}
xVal = static_cast(xTot);
return xshift;
}
/******************************************************************************/
/* f i l l H e a d e r */
/******************************************************************************/
void XrdXrootdMonitor::fillHeader(XrdXrootdMonHeader *hdr,
const char id, int size)
{ static XrdSysMutex seqMutex;
static int seq = 0;
int myseq;
// Generate a new sequence number
//
seqMutex.Lock();
myseq = 0x00ff & (seq++);
seqMutex.UnLock();
// Fill in the header
//
hdr->code = static_cast(id);
hdr->pseq = static_cast(myseq);
hdr->plen = htons(static_cast(size));
hdr->stod = startTime;
}
/******************************************************************************/
/* F l u s h */
/******************************************************************************/
void XrdXrootdMonitor::Flush()
{
int size;
kXR_int32 localWindow, now;
// Do not flush if the buffer is empty
//
if (nextEnt <= 1) return;
// Get the current window marker. No need for locks as simple memory accesses
// are sufficiently synchrnozed for our purposes.
//
localWindow = currWindow;
// Fill in the header and in the process we will have the current time
//
size = (nextEnt+1)*sizeof(XrdXrootdMonTrace)+sizeof(XrdXrootdMonHeader);
fillHeader(&monBuff->hdr, XROOTD_MON_MAPTRCE, size);
// Punt on the right ending time. We are trying to keep same-sized windows
// This was corrected by Matevz Tadel, as before we were using real time which
// could have been far into the future due to simple inactivity. So, Place the
// computed ending timing mark.
//
now = lastWindow + sizeWindow;
setTMark(monBuff, nextEnt, now);
// Send off the buffer and reinitialize it
//
if (this != altMon) Send(XROOTD_MON_IO, (void *)monBuff, size);
else {Send(XROOTD_MON_FILE, (void *)monBuff, size);
FlushTime = localWindow + autoFlush;
}
setTMark(monBuff, 0, localWindow);
nextEnt = 1;
}
/******************************************************************************/
void XrdXrootdMonitor::Flush(XrdXrootdMonitor::MonRdrBuff *mP)
{
int size;
// Reset flush time but do not flush an empty buffer. We use the current time
// to make sure a record atleast sits in the buffer a full flush period.
//
mP->flushIt = static_cast(time(0)) + autoFlush;
if (mP->nextEnt <= 1) return;
// Set ending timing mark and force a new one on the next fill
//
setTMurk(mP->Buff, mP->nextEnt, rdrTOD);
mP->lastTOD = 0;
// Fill in the header and in the process we will have the current time
//
size = (mP->nextEnt+1)*sizeof(XrdXrootdMonRedir)+sizeof(XrdXrootdMonHeader)+8;
fillHeader(&(mP->Buff->hdr), XROOTD_MON_MAPREDR, size);
// Send off the buffer and reinitialize it
//
Send(XROOTD_MON_REDR, (void *)(mP->Buff), size);
mP->nextEnt = 0;
}
/******************************************************************************/
/* M a r k */
/******************************************************************************/
void XrdXrootdMonitor::Mark()
{
kXR_int32 localWindow;
// Get the current window marker. Since simple memory accesses are sufficiently
// synchronized, no need to lock this.
//
localWindow = currWindow;
// Using an update provided by Matevz Tadel, UCSD, if this is an I/O buffer
// mark then we will also flush the I/O buffer if all the following hold:
// a) flushing enabled, b) buffer not empty, and c) covers the flush time.
// We would normally do this during Tick() but that would require too much
// locking in the middle of an I/O path, so we do psudo timed flushing.
//
if (this != altMon && autoFlash && nextEnt > 1)
{kXR_int32 bufStartWindow =
static_cast(ntohl(monBuff->info[0].arg2.Window));
if (localWindow - bufStartWindow >= autoFlash)
{Flush();
lastWindow = localWindow;
return;
}
}
// Now, optimize placing the window mark in the buffer. Using another MT fix we
// set the end of the previous window to be lastwindow + sizeWindow (instead of
// localWindow) to prevent windows from being wrongly zero sized.
//
if (monBuff->info[nextEnt-1].arg0.id[0] == XROOTD_MON_WINDOW)
{
monBuff->info[nextEnt-1].arg2.Window =
static_cast(htonl(localWindow));
}
else if (nextEnt+8 > lastEnt)
{
Flush();
}
else
{
monBuff->info[nextEnt].arg0.val = mySID;
monBuff->info[nextEnt].arg0.id[0] = XROOTD_MON_WINDOW;
monBuff->info[nextEnt].arg1.Window =
static_cast(htonl(lastWindow + sizeWindow));
monBuff->info[nextEnt].arg2.Window =
static_cast(htonl(localWindow));
nextEnt++;
}
lastWindow = localWindow;
}
/******************************************************************************/
/* S e n d */
/******************************************************************************/
int XrdXrootdMonitor::Send(int monMode, void *buff, int blen)
{
#ifndef NODEBUG
const char *TraceID = "Monitor";
#endif
static XrdSysMutex sendMutex;
int rc1, rc2;
sendMutex.Lock();
if (monMode & monMode1 && InetDest1)
{rc1 = InetDest1->Send((char *)buff, blen);
TRACE(DEBUG,blen <<" bytes sent to " <