/******************************************************************************/ /* */ /* X r d X r o o t d M o n F i l e . c c */ /* */ /* (c) 2012 by the Board of Trustees of the Leland Stanford, Jr., University */ /* All Rights Reserved */ /* Produced by Andrew Hanushevsky for Stanford University under contract */ /* DE-AC02-76-SFO0515 with the Department of Energy */ /* */ /* This file is part of the XRootD software suite. */ /* */ /* XRootD is free software: you can redistribute it and/or modify it under */ /* the terms of the GNU Lesser General Public License as published by the */ /* Free Software Foundation, either version 3 of the License, or (at your */ /* option) any later version. */ /* */ /* XRootD is distributed in the hope that it will be useful, but WITHOUT */ /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */ /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */ /* License for more details. */ /* */ /* You should have received a copy of the GNU Lesser General Public License */ /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */ /* COPYING (GPL license). If not, see . */ /* */ /* The copyright holder's institutional names and contributor's names may not */ /* be used to endorse or promote products derived from this software without */ /* specific prior written permission of the institution or contributor. */ /******************************************************************************/ #include #include "Xrd/XrdScheduler.hh" #include "XrdSys/XrdSysError.hh" #include "XrdSys/XrdSysPlatform.hh" #include "XrdXrootd/XrdXrootdMonFile.hh" #include "XrdXrootd/XrdXrootdFileStats.hh" /******************************************************************************/ /* G l o b a l s */ /******************************************************************************/ namespace XrdXrootdMonInfo { extern long long mySID; } /******************************************************************************/ /* S t a t i c M e m b e r s */ /******************************************************************************/ XrdSysError *XrdXrootdMonFile::eDest = 0; XrdScheduler *XrdXrootdMonFile::Sched = 0; XrdSysMutex XrdXrootdMonFile::bfMutex; XrdSysMutex XrdXrootdMonFile::fmMutex; XrdXrootdMonFMap XrdXrootdMonFile::fmMap[XrdXrootdMonFMap::mapNum]; short XrdXrootdMonFile::fmUse[XrdXrootdMonFMap::mapNum] = {0}; char *XrdXrootdMonFile::repBuff = 0; XrdXrootdMonHeader *XrdXrootdMonFile::repHdr = 0; XrdXrootdMonFileTOD *XrdXrootdMonFile::repTOD = 0; char *XrdXrootdMonFile::repNext = 0; char *XrdXrootdMonFile::repFirst = 0; char *XrdXrootdMonFile::repLast = 0; int XrdXrootdMonFile::totRecs = 0; int XrdXrootdMonFile::xfrRecs = 0; int XrdXrootdMonFile::repSize = 0; int XrdXrootdMonFile::repTime = 0; int XrdXrootdMonFile::fmHWM =-1; int XrdXrootdMonFile::crecSize = 0; int XrdXrootdMonFile::xfrCnt = 0; int XrdXrootdMonFile::xfrRem = 0; XrdXrootdMonFileXFR XrdXrootdMonFile::xfrRec; short XrdXrootdMonFile::crecNLen = 0; short XrdXrootdMonFile::trecNLen = 0; char XrdXrootdMonFile::fsLFN = 0; char XrdXrootdMonFile::fsLVL = 0; char XrdXrootdMonFile::fsOPS = 0; char XrdXrootdMonFile::fsSSQ = 0; char XrdXrootdMonFile::fsXFR = 0; char XrdXrootdMonFile::crecFlag = 0; /******************************************************************************/ /* C l o s e */ /******************************************************************************/ void XrdXrootdMonFile::Close(XrdXrootdFileStats *fsP, bool isDisc) { XrdXrootdMonFileCLS cRec; char *cP; int iEnt, iMap, iSlot; // If this object was registered for I/O reporting, deregister it. // if (fsP->MonEnt != -1) {iEnt = fsP->MonEnt & 0xffff; iMap = iEnt >> XrdXrootdMonFMap::fmShft; iSlot = iEnt & XrdXrootdMonFMap::fmMask; fsP->MonEnt = -1; fmMutex.Lock(); if (fmMap[iMap].Free(iSlot)) fmUse[iMap]--; if (iMap == fmHWM) while(fmHWM >= 0 && !fmUse[fmHWM]) fmHWM--; fmMutex.UnLock(); } // Insert a close record header (mostly precomputed) // cRec.Hdr.recType = XrdXrootdMonFileHdr::isClose; cRec.Hdr.recFlag = crecFlag; if (isDisc) cRec.Hdr.recFlag |= XrdXrootdMonFileHdr::forced; cRec.Hdr.recSize = crecNLen; cRec.Hdr.fileID = fsP->FileID; // Insert the I/O bytes // cRec.Xfr.read = htonll(fsP->xfr.read); cRec.Xfr.readv = htonll(fsP->xfr.readv); cRec.Xfr.write = htonll(fsP->xfr.write); // Insert ops if so wanted // if (fsOPS) {cRec.Ops.read = htonl (fsP->ops.read); if (fsP->ops.read) {cRec.Ops.rdMin = htonl (fsP->ops.rdMin); cRec.Ops.rdMax = htonl (fsP->ops.rdMax); } else { cRec.Ops.rdMin = cRec.Ops.rdMax = 0; } cRec.Ops.readv = htonl (fsP->ops.readv); cRec.Ops.rsegs = htonll(fsP->ops.rsegs); if (fsP->ops.readv) {cRec.Ops.rsMin = htons (fsP->ops.rsMin); cRec.Ops.rsMax = htons (fsP->ops.rsMax); cRec.Ops.rvMin = htonl (fsP->ops.rvMin); cRec.Ops.rvMax = htonl (fsP->ops.rvMax); } else { cRec.Ops.rsMin = cRec.Ops.rsMax = 0; cRec.Ops.rvMin = cRec.Ops.rvMax = 0; } cRec.Ops.write = htonl (fsP->ops.write); if (fsP->ops.write) {cRec.Ops.wrMin = htonl (fsP->ops.wrMin); cRec.Ops.wrMax = htonl (fsP->ops.wrMax); } else { cRec.Ops.wrMin = cRec.Ops.wrMax = 0; } } // Record sum of squares if so needed // if (fsSSQ) {XrdXrootdMonDouble xval; xval.dreal = fsP->ssq.read; cRec.Ssq.read.dlong = htonll(xval.dlong); xval.dreal = fsP->ssq.readv; cRec.Ssq.readv.dlong = htonll(xval.dlong); xval.dreal = fsP->ssq.rsegs; cRec.Ssq.rsegs.dlong = htonll(xval.dlong); xval.dreal = fsP->ssq.write; cRec.Ssq.write.dlong = htonll(xval.dlong); } // Get a pointer to the next slot (the buffer gets locked) // cP = GetSlot(crecSize); memcpy(cP, &cRec, crecSize); bfMutex.UnLock(); } /******************************************************************************/ /* D e f a u l t s */ /******************************************************************************/ void XrdXrootdMonFile::Defaults(int intv, int opts, int xfrcnt) { // Set the reporting interval and I/O counter // repTime = intv; xfrCnt = xfrcnt; xfrRem = xfrcnt; // Expand out the options // fsXFR = (opts & XROOTD_MON_FSXFR) != 0; fsLFN = (opts & XROOTD_MON_FSLFN) != 0; fsOPS = (opts & (XROOTD_MON_FSOPS | XROOTD_MON_FSSSQ)) != 0; fsSSQ = (opts & XROOTD_MON_FSSSQ) != 0; // Set monitoring level // if (fsSSQ) fsLVL = XrdXrootdFileStats::monSsq; else if (fsOPS) fsLVL = XrdXrootdFileStats::monOps; else if (intv) fsLVL = XrdXrootdFileStats::monOn; else fsLVL = XrdXrootdFileStats::monOff; } /******************************************************************************/ /* D i s c */ /******************************************************************************/ void XrdXrootdMonFile::Disc(unsigned int usrID) { static short drecSize = htons(sizeof(XrdXrootdMonFileDSC)); XrdXrootdMonFileDSC *dP; // Get a pointer to the next slot (the buffer gets locked) // dP = (XrdXrootdMonFileDSC *)GetSlot(sizeof(XrdXrootdMonFileDSC)); // Fill out the record. It's pretty simple // dP->Hdr.recType = XrdXrootdMonFileHdr::isDisc; dP->Hdr.recFlag = 0; dP->Hdr.recSize = drecSize; dP->Hdr.userID = usrID; bfMutex.UnLock(); } /******************************************************************************/ /* D o I t */ /******************************************************************************/ void XrdXrootdMonFile::DoIt() { // First check if we need to report all the I/O stats // xfrRem--; if (!xfrRem) DoXFR(); // Check if we should flush the buffer // bfMutex.Lock(); if (repNext) Flush(); bfMutex.UnLock(); // Reschedule ourselves // XrdXrootdMonitor::Sched->Schedule((XrdJob *)this, time(0)+repTime); } /******************************************************************************/ /* Private: D o X F R */ /******************************************************************************/ void XrdXrootdMonFile::DoXFR() { XrdXrootdFileStats *fsP; int keep, i, n, hwm; // Reset interval counter // xfrRem = xfrCnt; // Grab the high watermark once // fmMutex.Lock(); hwm = fmHWM; fmMutex.UnLock(); // Report on all the files we have registered. This is a CPU burner as we // periodically drop the lock to allow open/close requests to come through. // for (i = 0; i <= hwm; i++) {fmMutex.Lock(); if (fmUse[i]) {n = 0; keep = XrdXrootdMonFMap::fmHold; while((fsP = fmMap[i].Next(n))) {if (fsP->xfrXeq) DoXFR(fsP); if (!keep--) {fmMutex.UnLock(); keep = XrdXrootdMonFMap::fmHold; fmMutex.Lock(); } } } fmMutex.UnLock(); } } /******************************************************************************/ void XrdXrootdMonFile::DoXFR(XrdXrootdFileStats *fsP) { long long xfrRead, xfrReadv, xfrWrite; char *cP; // Turn off the activity flag // fsP->xfrXeq = 0; // Grab the I/O bytes to get a somewhat consistent image here // xfrRead = fsP->xfr.read; xfrReadv = fsP->xfr.readv; xfrWrite = fsP->xfr.write; // Complete the record // xfrRec.Hdr.fileID = fsP->FileID; xfrRec.Xfr.read = htonll(xfrRead); xfrRec.Xfr.readv = htonll(xfrReadv); xfrRec.Xfr.write = htonll(xfrWrite); // Get a pointer to the next slot (the buffer gets locked) // cP = GetSlot(sizeof(xfrRec)); memcpy(cP, &xfrRec, sizeof(xfrRec)); xfrRecs++; bfMutex.UnLock(); } /******************************************************************************/ /* I n i t */ /******************************************************************************/ bool XrdXrootdMonFile::Init(XrdScheduler *sp, XrdSysError *errp, int bfsz) { XrdXrootdMonFile *mfP; int alignment, pagsz = getpagesize(); // Set the variables // Sched = sp; eDest = errp; // Allocate a socket buffer // alignment = (bfsz < pagsz ? 1024 : pagsz); if (posix_memalign((void **)&repBuff, alignment, bfsz)) {eDest->Emsg("MonFile", "Unable to allocate monitor buffer."); return false; } // Set the header (always present) // repHdr = (XrdXrootdMonHeader *)repBuff; repHdr->code = XROOTD_MON_MAPFSTA; repHdr->pseq = 0; repHdr->stod = XrdXrootdMonitor::startTime; // Set the time record (always present) // repTOD = (XrdXrootdMonFileTOD *)(repBuff + sizeof(XrdXrootdMonHeader)); repTOD->Hdr.recType = XrdXrootdMonFileHdr::isTime; repTOD->Hdr.recFlag = XrdXrootdMonFileHdr::hasSID; repTOD->Hdr.recSize = htons(sizeof(XrdXrootdMonFileTOD)); repTOD->sID = static_cast(XrdXrootdMonInfo::mySID); // Establish first real record in the buffer (always fixed) // repFirst = repBuff+sizeof(XrdXrootdMonHeader)+sizeof(XrdXrootdMonFileTOD); // Calculate the end nut the next slot always starts with a null pointer // repLast = repBuff+bfsz-1; repNext = 0; // Calculate the close record size and the initial flags // crecSize = sizeof(XrdXrootdMonFileHdr) + sizeof(XrdXrootdMonStatXFR); if (fsSSQ || fsOPS) {crecSize += sizeof(XrdXrootdMonStatOPS); crecFlag = XrdXrootdMonFileHdr::hasOPS; } else crecFlag = 0; if (fsSSQ) {crecSize += sizeof(XrdXrootdMonStatSSQ); crecFlag |= XrdXrootdMonFileHdr::hasSSQ; } crecNLen = htons(static_cast(crecSize)); // Preformat the i/o record // xfrRec.Hdr.recType = XrdXrootdMonFileHdr::isXfr; xfrRec.Hdr.recFlag = 0; xfrRec.Hdr.recSize = htons(static_cast(sizeof(xfrRec))); // Calculate the tod record size // trecNLen = htons(static_cast(sizeof(XrdXrootdMonFileTOD))); // Allocate an instance of ourselves so we can schedule ourselves // mfP = new XrdXrootdMonFile(); // Schedule an the flushes // XrdXrootdMonitor::Sched->Schedule((XrdJob *)mfP, time(0)+repTime); return true; } /******************************************************************************/ /* Private: F l u s h */ /******************************************************************************/ void XrdXrootdMonFile::Flush() // The bfMutex must be locked { static int seq = 0; int bfSize; // Update the sequence number // repHdr->pseq = static_cast(0x00ff & seq++); // Insert ending timestamp and record counts // repTOD->Hdr.nRecs[0] = htons(static_cast(xfrRecs)); repTOD->Hdr.nRecs[1] = htons(static_cast(totRecs)); repTOD->tEnd = htonl(static_cast(time(0))); // Calculate buffer size and stick into the header // bfSize = (repNext - repBuff); repHdr->plen = htons(static_cast(bfSize)); repNext = 0; // Write this out // XrdXrootdMonitor::Send(XROOTD_MON_FSTA, repBuff, bfSize); repTOD->tBeg = repTOD->tEnd; xfrRecs = totRecs = 0; } /******************************************************************************/ /* Private: G e t S l o t */ /******************************************************************************/ char *XrdXrootdMonFile::GetSlot(int slotSZ) { char *myRec; // Lock this code to prevent interference (we should use double buffering) // Note that the caller must do the unlock when finished with the slot. // bfMutex.Lock(); // Check if we need to flush the buffer (sets repNext to zero). Otherwise, // if this is the first record insert a timestamp. // if (repNext) {if ((repNext + slotSZ) > repLast) {Flush(); repNext = repFirst; } } else { repTOD->tBeg = htonl(static_cast(time(0))); repNext = repFirst; } // Return the slot // totRecs++; myRec = repNext; repNext += slotSZ; return myRec; } /******************************************************************************/ /* O p e n */ /******************************************************************************/ void XrdXrootdMonFile::Open(XrdXrootdFileStats *fsP, const char *Path, unsigned int uDID, bool isRW) { static const int minRecSz = sizeof(XrdXrootdMonFileOPN) - sizeof(XrdXrootdMonFileLFN); XrdXrootdMonFileOPN *oP; int i = 0, sNum = -1, rLen, pLen = 0; // Assign the path a dictionary id if not assigned via file monitoring // if (fsP->FileID == 0) fsP->FileID = XrdXrootdMonitor::GetDictID(); // Add this open to the map table if we are doing I/O stats. // if (fsXFR) {fmMutex.Lock(); for (i = 0; i < XrdXrootdMonFMap::mapNum; i++) if (fmUse[i] < XrdXrootdMonFMap::fmSize) {if ((sNum = fmMap[i].Insert(fsP)) >= 0) {fmUse[i]++; if (i > fmHWM) fmHWM = i; break; } } fmMutex.UnLock(); } // Generate the cookie (real or virtual) to find the entry in the map table. // Supply the monitoring options for effeciency. // fsP->MonEnt = (sNum | (i << XrdXrootdMonFMap::fmShft)) & 0xffff; fsP->monLvl = fsLVL; fsP->xfrXeq = 0; // Compute the size of this record // rLen = minRecSz; if (fsLFN) {pLen = strlen(Path); rLen += sizeof(kXR_unt32) + pLen; i = (rLen + 8) & ~0x00000003; pLen = pLen + (i - rLen); rLen = i; } // Get a pointer to the next slot (the buffer gets locked) // oP = (XrdXrootdMonFileOPN *)GetSlot(rLen); // Fill out the record // oP->Hdr.recType = XrdXrootdMonFileHdr::isOpen; oP->Hdr.recFlag = (isRW ? XrdXrootdMonFileHdr::hasRW : 0); oP->Hdr.recSize = htons(static_cast(rLen)); oP->Hdr.fileID = fsP->FileID; oP->fsz = htonll(fsP->fSize); // Append user and path if so wanted (sizes have been verified) // if (fsLFN) {oP->Hdr.recFlag |= XrdXrootdMonFileHdr::hasLFN; oP->ufn.user = uDID; strncpy(oP->ufn.lfn, Path, pLen); } bfMutex.UnLock(); }