/******************************************************************************/
/* */
/* X r d O f s E v r . c c */
/* */
/* (c) 2006 by the Board of Trustees of the Leland Stanford, Jr., University */
/* All Rights Reserved */
/* Produced by Andrew Hanushevsky for Stanford University under contract */
/* DE-AC02-76-SFO0515 with the Department of Energy */
/* */
/* This file is part of the XRootD software suite. */
/* */
/* XRootD is free software: you can redistribute it and/or modify it under */
/* the terms of the GNU Lesser General Public License as published by the */
/* Free Software Foundation, either version 3 of the License, or (at your */
/* option) any later version. */
/* */
/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
/* License for more details. */
/* */
/* You should have received a copy of the GNU Lesser General Public License */
/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
/* COPYING (GPL license). If not, see . */
/* */
/* The copyright holder's institutional names and contributor's names may not */
/* be used to endorse or promote products derived from this software without */
/* specific prior written permission of the institution or contributor. */
/******************************************************************************/
#include
#include
#include
#include "XrdCms/XrdCmsClient.hh"
#include "XrdOfs/XrdOfsEvr.hh"
#include "XrdOfs/XrdOfsStats.hh"
#include "XrdOfs/XrdOfsTrace.hh"
#include "XrdSys/XrdSysError.hh"
#include "XrdSys/XrdSysTimer.hh"
#include "XrdOuc/XrdOucEnv.hh"
#include "XrdOuc/XrdOucTrace.hh"
#include "XrdNet/XrdNetOpts.hh"
#include "XrdNet/XrdNetSocket.hh"
#include "XrdSys/XrdSysHeaders.hh"
/******************************************************************************/
/* E x t e r n a l L i n k a g e s */
/******************************************************************************/
extern XrdOfsStats OfsStats;
extern XrdOucTrace OfsTrace;
void *XrdOfsEvRecv(void *pp)
{
XrdOfsEvr *evr = (XrdOfsEvr *)pp;
evr->recvEvents();
return (void *)0;
}
void *XrdOfsEvFlush(void *pp)
{
XrdOfsEvr *evr = (XrdOfsEvr *)pp;
evr->flushEvents();
return (void *)0;
}
int XrdOfsScrubScan(const char *key, XrdOfsEvr::theEvent *cip, void *xargp)
{return 0;}
/******************************************************************************/
/* D e s t r u c t o r */
/******************************************************************************/
XrdOfsEvr::~XrdOfsEvr()
{
// Close the FIFO. This will cause the reader to exit
//
myMutex.Lock();
eventFIFO.Close();
myMutex.UnLock();
}
/******************************************************************************/
/* f l u s h E v e n t s */
/******************************************************************************/
void XrdOfsEvr::flushEvents()
{
theClient *tp, *ntp;
int expWait, expClock;
// Compute the hash flush interval
//
if ((expWait = maxLife/4) == 0) expWait = 60;
expClock = expWait;
// We wait for the right period of time, unless there is a defered event
//
do {myMutex.Lock();
if ((ntp = deferQ)) deferQ = 0;
else runQ = 0;
myMutex.UnLock();
while(ntp)
{XrdSysTimer::Wait(1000*60);
expClock -= 60;
myMutex.Lock();
while((tp = ntp))
{Events.Del(tp->Path);
ntp = tp->Next;
delete tp;
}
if ((ntp = deferQ)) deferQ = 0;
else runQ = 0;
myMutex.UnLock();
if (expClock <= 0)
{myMutex.Lock();
Events.Apply(XrdOfsScrubScan, (void *)0);
myMutex.UnLock();
expClock = expWait;
}
}
mySem.Wait();
} while(1);
}
/******************************************************************************/
/* I n i t */
/******************************************************************************/
int XrdOfsEvr::Init(XrdSysError *eobj) // Must be called 1st!
{
XrdNetSocket *msgSock;
char *p, path[2048];
int n;
// Set he error object (need to do only once)
//
eDest = eobj;
// Create path to the pipe we will creat
//
if (!(p = getenv("XRDADMINPATH")) || !*p)
{eobj->Emsg("Events", "XRDADMINPATH not defined");
return 0;
}
strcpy(path, p); n = strlen(p);
if (path[n-1] != '/') {path[n] = '/'; n++;}
strcpy(&path[n], "ofsEvents");
XrdOucEnv::Export("XRDOFSEVENTS", path);
// Now create a socket to a path
//
if (!(msgSock = XrdNetSocket::Create(eobj,path,0,0660,XRDNET_FIFO)))
return 0;
msgFD = msgSock->Detach();
delete msgSock;
// We succeeded and are now ready for the call to he second stage below
//
return 1;
}
/******************************************************************************/
int XrdOfsEvr::Init(XrdCmsClient *trgp)
{
pthread_t tid;
int rc;
// Set the balancer pointers (err object set in 1st phase Init).
//
Balancer = trgp;
// Now start a thread to get incomming messages
//
if ((rc = XrdSysThread::Run(&tid, XrdOfsEvRecv, static_cast(this),
0, "Event receiver")))
{eDest->Emsg("Evr", rc, "create event reader thread");
return 0;
}
// Now start a thread to flush posted events
//
if ((rc = XrdSysThread::Run(&tid, XrdOfsEvFlush,static_cast(this),
0, "Event flusher")))
{eDest->Emsg("Evr", rc, "create event flush thread");
return 0;
}
// All done
//
return 1;
}
/******************************************************************************/
/* r e c v E v e n t s */
/******************************************************************************/
void XrdOfsEvr::recvEvents()
{
EPNAME("recvEvent");
const char *tident = 0;
char *lp,*tp;
// Attach the fifo FD to the stream
//
eventFIFO.Attach(msgFD);
// Now just start reading the events until the FD is closed
//
while((lp = eventFIFO.GetLine()))
{DEBUG("-->" <Emsg("Evr", "Unknown event name -", tp);
}
}
}
/******************************************************************************/
/* W a i t 4 E v e n t */
/******************************************************************************/
void XrdOfsEvr::Wait4Event(const char *path, XrdOucErrInfo *einfo)
{
// Replace original callback with our callback so we can queue this event
// after the wait request has been sent to the client. This avoids a race
// where the client might get the resume signal before the wait request.
//
einfo->setErrCB((XrdOucEICB *)new theClient(this, einfo, path));
}
/******************************************************************************/
/* W o r k 4 E v e n t */
/******************************************************************************/
void XrdOfsEvr::Work4Event(theClient *Client)
{
struct theEvent *anEvent;
theClient *aClient = 0;
// First ste is to see if this event was posted
//
myMutex.Lock();
if (!(anEvent = Events.Find(Client->Path)))
Events.Add(Client->Path, new theEvent(0, 0, Client), maxLife);
else {aClient = anEvent->aClient;
while(aClient)
{if (aClient->evtCB->Same(Client->evtCBarg,aClient->evtCBarg))
{aClient->evtCBarg = Client->evtCBarg;
break;
}
aClient = aClient->Next;
}
if (!aClient) {Client->Next = anEvent->aClient;
anEvent->aClient = Client;
}
if (anEvent->Happened) sendEvent(anEvent);
}
myMutex.UnLock();
// Delete the Client object if we really don't need it
//
if (aClient) delete Client;
}
/******************************************************************************/
/* P r i v a t e M e t h o d s */
/******************************************************************************/
/******************************************************************************/
/* e v e n t S t a g e */
/******************************************************************************/
// stage {OK | ENOENT | BAD} [] \n
void XrdOfsEvr::eventStage()
{
int rc;
char *tp, *eMsg, *altMsg = 0;
struct theEvent *anEvent;
// Get the status token and decode it
//
if (!(tp = eventFIFO.GetToken()))
{eDest->Emsg("Evr", "Missing stage event status"); return;}
if (!strcmp(tp, "OK")) {rc = 0;
OfsStats.Add(OfsStats.Data.numSeventOK);
}
else if (!strcmp(tp, "ENOENT")) {rc = ENOENT;
altMsg = (char *)"file does not exist.";
}
else if (!strcmp(tp, "BAD")) {rc = -1;
OfsStats.Add(OfsStats.Data.numSeventOK);
altMsg = (char *)"Dynamic staging failed.";
}
else {rc = -1;
eDest->Emsg("Evr", "Invalid stage event status -", tp);
altMsg = (char *)"Dynamic staging malfunctioned.";
OfsStats.Add(OfsStats.Data.numSeventOK);
}
// Get the path and optional message
//
if (!(tp = eventFIFO.GetToken(&eMsg)))
{eDest->Emsg("Evr", "Missing stage event path"); return;}
if (rc)
if (eMsg) {while(*eMsg == ' ') eMsg++;
if (!*eMsg) eMsg = altMsg;
} else eMsg = altMsg;
else eMsg = 0;
// At this point if we have a balancer, tell it what happened
//
if (Balancer)
{if (rc == 0) Balancer->Added(tp);
else Balancer->Removed(tp);
}
// Either people are waiting for this event or it is preposted event.
//
myMutex.Lock();
if (!(anEvent = Events.Find(tp)))
Events.Add(tp, new theEvent(rc, eMsg), maxLife);
else {if (anEvent->finalRC == 0)
{anEvent->finalRC = rc;
if (eMsg) anEvent->finalMsg = strdup(eMsg);
anEvent->Happened = 1;
}
if (anEvent->aClient) sendEvent(anEvent);
}
myMutex.UnLock();
}
/******************************************************************************/
/* s e n d E v e n t */
/******************************************************************************/
void XrdOfsEvr::sendEvent(theEvent *ep)
{
theClient *cp;
XrdOucErrInfo *einfo;
int doDel = 0, Result = (ep->finalRC ? SFS_ERROR : SFS_OK);
// For each client, issue a call back sending the result back
// The event also goes in the defered delete queue as we need to hold on
// to it just in case a client is in-transit
//
while((cp = ep->aClient))
{einfo = new XrdOucErrInfo(cp->User, (XrdOucEICB *)0, cp->evtCBarg);
einfo->setErrInfo(ep->finalRC, (ep->finalMsg ? ep->finalMsg : ""));
cp->evtCB->Done(Result, einfo);
ep->aClient = cp->Next;
if (doDel) delete cp;
else {cp->Next = deferQ; deferQ = cp; doDel = 1;}
}
// Post the defer queue handler
//
if (!runQ) {runQ = 1; mySem.Post();}
}