/******************************************************************************/
/* */
/* X r d C m s R R Q . c c */
/* */
/* (c) 2007 by the Board of Trustees of the Leland Stanford, Jr., University */
/* All Rights Reserved */
/* Produced by Andrew Hanushevsky for Stanford University under contract */
/* DE-AC02-76-SFO0515 with the Department of Energy */
/* */
/* This file is part of the XRootD software suite. */
/* */
/* XRootD is free software: you can redistribute it and/or modify it under */
/* the terms of the GNU Lesser General Public License as published by the */
/* Free Software Foundation, either version 3 of the License, or (at your */
/* option) any later version. */
/* */
/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
/* License for more details. */
/* */
/* You should have received a copy of the GNU Lesser General Public License */
/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
/* COPYING (GPL license). If not, see . */
/* */
/* The copyright holder's institutional names and contributor's names may not */
/* be used to endorse or promote products derived from this software without */
/* specific prior written permission of the institution or contributor. */
/******************************************************************************/
#include
#include
#include
#include
#include "XrdCms/XrdCmsCluster.hh"
#include "XrdCms/XrdCmsNode.hh"
#include "XrdCms/XrdCmsRRQ.hh"
#include "XrdCms/XrdCmsRTable.hh"
#include "XrdCms/XrdCmsTrace.hh"
#include "XrdSys/XrdSysError.hh"
#include "XrdSys/XrdSysTimer.hh"
#include
using namespace XrdCms;
// Note: Debugging statements have been commented out. This is time critical
// code and debugging may only be enabled in standalone testing as the
// delays introduced by DEBUG() will usually cause timeout failures.
/******************************************************************************/
/* G l o b a l O b j e c t s & S t a t i c M e m b e r s */
/******************************************************************************/
XrdCmsRRQ XrdCms::RRQ;
XrdSysMutex XrdCmsRRQSlot::myMutex;
XrdCmsRRQSlot *XrdCmsRRQSlot::freeSlot = 0;
short XrdCmsRRQSlot::initSlot = 0;
/******************************************************************************/
/* E x t e r n a l F u n c t i o n s */
/******************************************************************************/
void *XrdCmsRRQ_StartTimeOut(void *parg) {return RRQ.TimeOut();}
void *XrdCmsRRQ_StartRespond(void *parg) {return RRQ.Respond();}
/******************************************************************************/
/* X r d C m s R R Q C l a s s M e t h o d s */
/******************************************************************************/
/******************************************************************************/
/* A d d */
/******************************************************************************/
short XrdCmsRRQ::Add(short Snum, XrdCmsRRQInfo *Info)
{
// EPNAME("RRQ Add");
XrdCmsRRQSlot *sp;
// Obtain a slot and fill it in
//
if (!(sp = XrdCmsRRQSlot::Alloc(Info))) return 0;
// DEBUG("adding slot " <slotNum);
// If a slot number given, check if it's the right slot and it is still queued.
// If so, piggy-back this request to existing one and make a fast exit
//
myMutex.Lock(); Stats.Add2Q++;
if (Snum && Slot[Snum].Info.Key == Info->Key && Slot[Snum].Expire)
{if (Info->isLU)
{sp->LkUp = Slot[Snum].LkUp;
Slot[Snum].LkUp = sp;
} else {
sp->Cont = Slot[Snum].Cont;
Slot[Snum].Cont = sp;
}
Stats.PBack++;
myMutex.UnLock();
return Snum;
}
// Queue this slot to the pending response queue and tell the timeout scheduler
//
sp->Expire = myClock+1;
if (waitQ.Singleton()) isWaiting.Post();
waitQ.Prev()->Insert(&sp->Link);
myMutex.UnLock();
return sp->slotNum;
}
/******************************************************************************/
/* D e l */
/******************************************************************************/
void XrdCmsRRQ::Del(short Snum, const void *Key)
{
Ready(Snum, Key, 0, 0);
}
/******************************************************************************/
/* I n i t */
/******************************************************************************/
int XrdCmsRRQ::Init(int Tint, int Tdly)
{
int rc;
pthread_t tid;
// Set values
//
if (Tint) Tslice = Tint;
if (Tdly) Tdelay = Tdly;
Stats.Reset();
// Fill out the response structure
//
dataResp.Hdr.streamid = 0;
dataResp.Hdr.rrCode = kYR_data;
dataResp.Hdr.modifier = 0;
dataResp.Hdr.datalen = 0;
dataResp.Val = 0;
// Fill out the data i/o vector
//
data_iov[0].iov_base = (char *)&dataResp;
data_iov[0].iov_len = sizeof(dataResp);
data_iov[1].iov_base = databuff;;
// Fill out the response structure
//
redrResp.Hdr.streamid = 0;
redrResp.Hdr.rrCode = kYR_redirect;
redrResp.Hdr.modifier = 0;
redrResp.Hdr.datalen = 0;
redrResp.Val = 0;
// Fill out the redirect i/o vector
//
redr_iov[0].iov_base = (char *)&redrResp;
redr_iov[0].iov_len = sizeof(redrResp);
redr_iov[1].iov_base = hostbuff;;
// Fill out the wait info
//
waitResp.Hdr.streamid = 0;
waitResp.Hdr.rrCode = kYR_wait;
waitResp.Hdr.modifier = 0;
waitResp.Hdr.datalen = htons(static_cast(sizeof(waitResp.Val)));
waitResp.Val = htonl(Tdelay);
// Start the responder thread
//
if ((rc = XrdSysThread::Run(&tid, XrdCmsRRQ_StartRespond, (void *)0,
0, "Request Responder")))
{Say.Emsg("Config", rc, "create request responder thread");
return 1;
}
// Start the timeout thread
//
if ((rc = XrdSysThread::Run(&tid, XrdCmsRRQ_StartTimeOut, (void *)0,
0, "Request Timeout")))
{Say.Emsg("Config", rc, "create request timeout thread");
return 1;
}
// All done
//
return 0;
}
/******************************************************************************/
/* R e a d y */
/******************************************************************************/
int XrdCmsRRQ::Ready(int Snum, const void *Key, SMask_t mask1, SMask_t mask2)
{
// EPNAME("RRQ Ready");
XrdCmsRRQSlot *sp;
// Check if it's the right slot and it is still queued.
//
myMutex.Lock();
sp = &Slot[Snum];
if (sp->Info.Key != Key || !sp->Expire)
{myMutex.UnLock();
// DEBUG("slot " <Arg1 |= mask1; sp->Arg2 = mask2;
Stats.Resp++;
// Check if we should still hold on to this slot because the number of actual
// responders is less than the number needed.
//
if (sp->Info.actR < sp->Info.minR)
{sp->Info.actR++; Stats.Multi++;
myMutex.UnLock();
return 0;
}
// Move the element from the waiting queue to the ready queue
//
sp->Link.Remove();
if (readyQ.Singleton()) isReady.Post();
readyQ.Prev()->Insert(&sp->Link);
myMutex.UnLock();
// DEBUG("readied slot " <Item(); sp->Link.Remove(); sp->Expire = 0;
myMutex.UnLock();
// A locate request can be pggy-backed on a select request and vice-versa
// We separate the two queues here as each has a different response.
//
if (sp->Info.isLU)
{if (sp->Cont)
{sp->Cont->Arg1 = sp->Arg1;
sendRedResp(sp->Cont);
}
sendLocResp(sp);
} else {
if (sp->LkUp)
{sp->LkUp->Arg1 = sp->Arg1; sp->LkUp->Arg2 = sp->Arg2;
sendLocResp(sp->LkUp);
}
sendRedResp(sp);
}
sp->Recycle();
} while(1);
} while(1);
// Keep the compiler happy
//
return (void *)0;
}
/******************************************************************************/
/* s e n d L o c R e s p */
/******************************************************************************/
void XrdCmsRRQ::sendLocResp(XrdCmsRRQSlot *lP)
{
static const int ovhd = sizeof(kXR_unt32);
XrdCmsSelected *sP;
XrdCmsNode *nP;
XrdCmsCluster::CmsLSOpts lsopts;
int bytes;
bool oksel;
// Send a delay if we timed out
//
if (!(lP->Arg1))
{sendLwtResp(lP);
return;
}
// Get the list of servers that have this file. If none found, then force the
// client to wait as this should never happen and the long path is called for.
// ASAP responses always respond in with IPv6 addresses or mapped IPv4 ones.
//
lsopts = static_cast(lP->Info.lsLU);
if (!(sP = Cluster.List(lP->Arg1, lsopts, oksel))
|| (!(bytes = XrdCmsNode::do_LocFmt(databuff,sP,lP->Arg2,lP->Info.rwVec))))
{sendLwtResp(lP);
return;
}
// Complete the I/O vector
//
bytes++;
data_iov[1].iov_len = bytes;
bytes += ovhd;
dataResp.Hdr.datalen = htons(static_cast(bytes));
bytes += sizeof(dataResp.Hdr);
// Send the reply to each waiting redirector
//
RTable.Lock();
do {if ((nP = RTable.Find(lP->Info.Rnum, lP->Info.Rinst)))
{dataResp.Hdr.streamid = lP->Info.ID;
nP->Send(data_iov, iov_cnt, bytes);
}
luFast++;
} while((lP = lP->LkUp));
RTable.UnLock();
}
/******************************************************************************/
/* s e n d L w t R e s p */
/******************************************************************************/
void XrdCmsRRQ::sendLwtResp(XrdCmsRRQSlot *rP)
{
// EPNAME("sendLwtResp");
XrdCmsNode *nP;
// For each request, find the redirector and ask it to send a wait
//
RTable.Lock();
do{if ((nP = RTable.Find(rP->Info.Rnum, rP->Info.Rinst)))
{waitResp.Hdr.streamid = rP->Info.ID; luSlow++;
nP->Send((char *)&waitResp, sizeof(waitResp));
// DEBUG("Redirect delay " <Name() <<' ' <Rnum <<'.' <Rinst <<"not found");}
} while((rP = rP->LkUp));
RTable.UnLock();
}
/******************************************************************************/
/* s e n d R e d R e s p */
/******************************************************************************/
void XrdCmsRRQ::sendRedResp(XrdCmsRRQSlot *rP)
{
// EPNAME("sendRedResp");
static const int ovhd = sizeof(kXR_unt32);
XrdCmsNode *nP;
int doredir, port, hlen = 0;
// Determine where the client should be redirected
//
if ((doredir = (rP->Arg1 && Cluster.Select(rP->Arg1, port, hostbuff, hlen,
rP->Info.isRW, rP->Info.actR,
rP->Info.ifOP))))
{redrResp.Val = htonl(port);
redrResp.Hdr.datalen = htons(static_cast(hlen+ovhd));
redr_iov[1].iov_len = hlen;
hlen += ovhd + sizeof(redrResp.Hdr);
}
// For each request, find the redirector and ask it to send the message
//
RTable.Lock();
do{if ((nP = RTable.Find(rP->Info.Rnum, rP->Info.Rinst)))
{if (doredir){redrResp.Hdr.streamid = rP->Info.ID; rdFast++;
nP->Send(redr_iov, iov_cnt, hlen);
// DEBUG("Fast redirect " <Name() <<" -> " <Info.ID; rdSlow++;
nP->Send((char *)&waitResp, sizeof(waitResp));
// DEBUG("Redirect delay " <Name() <<' ' <Rnum <<'.' <Rinst <<"not found");}
} while((rP = rP->Cont));
RTable.UnLock();
}
/******************************************************************************/
/* T i m e O u t */
/******************************************************************************/
void *XrdCmsRRQ::TimeOut()
{
// EPNAME("RRQ TimeOut");
XrdCmsRRQSlot *sp;
// We measure millisecond intervals to timeout waiting requests. We used to zero
// out arg1/2 to force expiration, but they would be zero anyway if no responses
// occurred. Now with qdn we need to leave them alone as we may have defered
// a fast dispatch because we were waiting for more than one responder.
//
while(1)
{isWaiting.Wait();
myMutex.Lock();
while(1)
{myClock++;
myMutex.UnLock();
XrdSysTimer::Wait(Tslice);
myMutex.Lock();
while((sp=waitQ.Next()->Item()) && sp->Expire < myClock)
{sp->Link.Remove();
if (readyQ.Singleton()) isReady.Post();
// sp->Arg1 = 0; sp->Arg2 = 0;
// DEBUG("expired slot " <slotNum);
readyQ.Prev()->Insert(&sp->Link);
}
if (waitQ.Singleton()) break;
}
myMutex.UnLock();
}
// Keep the compiler happy
//
return (void *)0;
}
/******************************************************************************/
/* X r d C m s R R Q S l o t C l a s s M e t h o d s */
/******************************************************************************/
/******************************************************************************/
/* C o n s t r u c t o r */
/******************************************************************************/
XrdCmsRRQSlot::XrdCmsRRQSlot() : Link(this)
{
slotNum = initSlot++;
if (slotNum)
{Cont = freeSlot;
freeSlot = this;
} else Cont = 0;
Arg1 = Arg2 = 0;
Info.Key = 0;
}
/******************************************************************************/
/* A l l o c */
/******************************************************************************/
XrdCmsRRQSlot *XrdCmsRRQSlot::Alloc(XrdCmsRRQInfo *theInfo)
{
XrdCmsRRQSlot *sp;
myMutex.Lock();
if ((sp = freeSlot))
{sp->Info = *theInfo;
freeSlot = sp->Cont;
sp->Cont = 0;
sp->LkUp = 0;
sp->Arg1 = 0;
sp->Arg2 = 0;
}
myMutex.UnLock();
return sp;
}
/******************************************************************************/
/* R e c y c l e */
/******************************************************************************/
void XrdCmsRRQSlot::Recycle()
{
XrdCmsRRQSlot *sp, *np;
myMutex.Lock();
if (!Link.Singleton()) Link.Remove();
// Remove items in the lookup chain first
//
np = LkUp;
while((sp = np))
{np = sp->LkUp;
sp->Cont = freeSlot;
freeSlot = sp;
sp->Info.Key = 0;
}
// Now remove items in the select chain
//
np = Cont;
while((sp = np))
{np = sp->Cont;
sp->Cont = freeSlot;
freeSlot = sp;
sp->Info.Key = 0;
}
// Now put this item in the free chain
//
Info.Key = 0;
Cont = freeSlot;
freeSlot = this;
myMutex.UnLock();
}