/* */
/* X r d C m s R R Q . c c */
/* */
/* (c) 2007 by the Board of Trustees of the Leland Stanford, Jr., University */
/* All Rights Reserved */
/* Produced by Andrew Hanushevsky for Stanford University under contract */
/* DE-AC02-76-SFO0515 with the Department of Energy */
/* */
/* This file is part of the XRootD software suite. */
/* */
/* XRootD is free software: you can redistribute it and/or modify it under */
/* the terms of the GNU Lesser General Public License as published by the */
/* Free Software Foundation, either version 3 of the License, or (at your */
/* option) any later version. */
/* */
/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
/* License for more details. */
/* */
/* You should have received a copy of the GNU Lesser General Public License */
/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
/* COPYING (GPL license). If not, see . */
/* */
/* The copyright holder's institutional names and contributor's names may not */
/* be used to endorse or promote products derived from this software without */
/* specific prior written permission of the institution or contributor. */
#include "XrdCms/XrdCmsCluster.hh"
#include "XrdCms/XrdCmsNode.hh"
#include "XrdCms/XrdCmsRRQ.hh"
#include "XrdCms/XrdCmsRTable.hh"
#include "XrdCms/XrdCmsTrace.hh"
#include "XrdSys/XrdSysError.hh"
#include "XrdSys/XrdSysTimer.hh"
using namespace XrdCms;
// Note: Debugging statements have been commented out. This is time critical
// code and debugging may only be enabled in standalone testing as the
// delays introduced by DEBUG() will usually cause timeout failures.
/* G l o b a l O b j e c t s & S t a t i c M e m b e r s */
XrdCmsRRQ XrdCms::RRQ;
XrdSysMutex XrdCmsRRQSlot::myMutex;
XrdCmsRRQSlot *XrdCmsRRQSlot::freeSlot = 0;
short XrdCmsRRQSlot::initSlot = 0;
/* E x t e r n a l F u n c t i o n s */
void *XrdCmsRRQ_StartTimeOut(void *parg) {return RRQ.TimeOut();}
void *XrdCmsRRQ_StartRespond(void *parg) {return RRQ.Respond();}
/* X r d C m s R R Q C l a s s M e t h o d s */
/* A d d */
short XrdCmsRRQ::Add(short Snum, XrdCmsRRQInfo *Info)
// EPNAME("RRQ Add");
XrdCmsRRQSlot *sp;
// Obtain a slot and fill it in
if (!(sp = XrdCmsRRQSlot::Alloc(Info))) return 0;
// DEBUG("adding slot " <slotNum);
// If a slot number given, check if it's the right slot and it is still queued.
// If so, piggy-back this request to existing one and make a fast exit
myMutex.Lock(); Stats.Add2Q++;
if (Snum && Slot[Snum].Info.Key == Info->Key && Slot[Snum].Expire)
{if (Info->isLU)
{sp->LkUp = Slot[Snum].LkUp;
Slot[Snum].LkUp = sp;
} else {
sp->Cont = Slot[Snum].Cont;
Slot[Snum].Cont = sp;
return Snum;
// Queue this slot to the pending response queue and tell the timeout scheduler
sp->Expire = myClock+1;
if (waitQ.Singleton()) isWaiting.Post();
return sp->slotNum;
/* D e l */
void XrdCmsRRQ::Del(short Snum, const void *Key)
Ready(Snum, Key, 0, 0);
/* I n i t */
int XrdCmsRRQ::Init(int Tint, int Tdly)
int rc;
pthread_t tid;
// Set values
if (Tint) Tslice = Tint;
if (Tdly) Tdelay = Tdly;
// Fill out the response structure
dataResp.Hdr.streamid = 0;
dataResp.Hdr.rrCode = kYR_data;
dataResp.Hdr.modifier = 0;
dataResp.Hdr.datalen = 0;
dataResp.Val = 0;
// Fill out the data i/o vector
data_iov[0].iov_base = (char *)&dataResp;
data_iov[0].iov_len = sizeof(dataResp);
data_iov[1].iov_base = databuff;;
// Fill out the response structure
redrResp.Hdr.streamid = 0;
redrResp.Hdr.rrCode = kYR_redirect;
redrResp.Hdr.modifier = 0;
redrResp.Hdr.datalen = 0;
redrResp.Val = 0;
// Fill out the redirect i/o vector
redr_iov[0].iov_base = (char *)&redrResp;
redr_iov[0].iov_len = sizeof(redrResp);
redr_iov[1].iov_base = hostbuff;;
// Fill out the wait info
waitResp.Hdr.streamid = 0;
waitResp.Hdr.rrCode = kYR_wait;
waitResp.Hdr.modifier = 0;
waitResp.Hdr.datalen = htons(static_cast(sizeof(waitResp.Val)));
waitResp.Val = htonl(Tdelay);
// Start the responder thread
if ((rc = XrdSysThread::Run(&tid, XrdCmsRRQ_StartRespond, (void *)0,
0, "Request Responder")))
{Say.Emsg("Config", rc, "create request responder thread");
return 1;
// Start the timeout thread
if ((rc = XrdSysThread::Run(&tid, XrdCmsRRQ_StartTimeOut, (void *)0,
0, "Request Timeout")))
{Say.Emsg("Config", rc, "create request timeout thread");
return 1;
// All done
return 0;
/* R e a d y */
int XrdCmsRRQ::Ready(int Snum, const void *Key, SMask_t mask1, SMask_t mask2)
// EPNAME("RRQ Ready");
XrdCmsRRQSlot *sp;
// Check if it's the right slot and it is still queued.
sp = &Slot[Snum];
if (sp->Info.Key != Key || !sp->Expire)
// DEBUG("slot " <Arg1 |= mask1; sp->Arg2 = mask2;
// Check if we should still hold on to this slot because the number of actual
// responders is less than the number needed.
if (sp->Info.actR < sp->Info.minR)
{sp->Info.actR++; Stats.Multi++;
return 0;
// Move the element from the waiting queue to the ready queue
if (readyQ.Singleton()) isReady.Post();
// DEBUG("readied slot " <Item(); sp->Link.Remove(); sp->Expire = 0;
// A locate request can be pggy-backed on a select request and vice-versa
// We separate the two queues here as each has a different response.
if (sp->Info.isLU)
{if (sp->Cont)
{sp->Cont->Arg1 = sp->Arg1;
} else {
if (sp->LkUp)
{sp->LkUp->Arg1 = sp->Arg1; sp->LkUp->Arg2 = sp->Arg2;
} while(1);
} while(1);
// Keep the compiler happy
return (void *)0;
/* s e n d L o c R e s p */
void XrdCmsRRQ::sendLocResp(XrdCmsRRQSlot *lP)
static const int ovhd = sizeof(kXR_unt32);
XrdCmsSelected *sP;
XrdCmsNode *nP;
XrdCmsCluster::CmsLSOpts lsopts;
int bytes;
bool oksel;
// Send a delay if we timed out
if (!(lP->Arg1))
// Get the list of servers that have this file. If none found, then force the
// client to wait as this should never happen and the long path is called for.
// ASAP responses always respond in with IPv6 addresses or mapped IPv4 ones.
lsopts = static_cast(lP->Info.lsLU);
if (!(sP = Cluster.List(lP->Arg1, lsopts, oksel))
|| (!(bytes = XrdCmsNode::do_LocFmt(databuff,sP,lP->Arg2,lP->Info.rwVec))))
// Complete the I/O vector
data_iov[1].iov_len = bytes;
bytes += ovhd;
dataResp.Hdr.datalen = htons(static_cast(bytes));
bytes += sizeof(dataResp.Hdr);
// Send the reply to each waiting redirector
do {if ((nP = RTable.Find(lP->Info.Rnum, lP->Info.Rinst)))
{dataResp.Hdr.streamid = lP->Info.ID;
nP->Send(data_iov, iov_cnt, bytes);
} while((lP = lP->LkUp));
/* s e n d L w t R e s p */
void XrdCmsRRQ::sendLwtResp(XrdCmsRRQSlot *rP)
// EPNAME("sendLwtResp");
XrdCmsNode *nP;
// For each request, find the redirector and ask it to send a wait
do{if ((nP = RTable.Find(rP->Info.Rnum, rP->Info.Rinst)))
{waitResp.Hdr.streamid = rP->Info.ID; luSlow++;
nP->Send((char *)&waitResp, sizeof(waitResp));
// DEBUG("Redirect delay " <Name() <<' ' <Rnum <<'.' <Rinst <<"not found");}
} while((rP = rP->LkUp));
/* s e n d R e d R e s p */
void XrdCmsRRQ::sendRedResp(XrdCmsRRQSlot *rP)
// EPNAME("sendRedResp");
static const int ovhd = sizeof(kXR_unt32);
XrdCmsNode *nP;
int doredir, port, hlen = 0;
// Determine where the client should be redirected
if ((doredir = (rP->Arg1 && Cluster.Select(rP->Arg1, port, hostbuff, hlen,
rP->Info.isRW, rP->Info.actR,
{redrResp.Val = htonl(port);
redrResp.Hdr.datalen = htons(static_cast(hlen+ovhd));
redr_iov[1].iov_len = hlen;
hlen += ovhd + sizeof(redrResp.Hdr);
// For each request, find the redirector and ask it to send the message
do{if ((nP = RTable.Find(rP->Info.Rnum, rP->Info.Rinst)))
{if (doredir){redrResp.Hdr.streamid = rP->Info.ID; rdFast++;
nP->Send(redr_iov, iov_cnt, hlen);
// DEBUG("Fast redirect " <Name() <<" -> " <Info.ID; rdSlow++;
nP->Send((char *)&waitResp, sizeof(waitResp));
// DEBUG("Redirect delay " <Name() <<' ' <Rnum <<'.' <Rinst <<"not found");}
} while((rP = rP->Cont));
/* T i m e O u t */
void *XrdCmsRRQ::TimeOut()
// EPNAME("RRQ TimeOut");
XrdCmsRRQSlot *sp;
// We measure millisecond intervals to timeout waiting requests. We used to zero
// out arg1/2 to force expiration, but they would be zero anyway if no responses
// occurred. Now with qdn we need to leave them alone as we may have defered
// a fast dispatch because we were waiting for more than one responder.
while((sp=waitQ.Next()->Item()) && sp->Expire < myClock)
if (readyQ.Singleton()) isReady.Post();
// sp->Arg1 = 0; sp->Arg2 = 0;
// DEBUG("expired slot " <slotNum);
if (waitQ.Singleton()) break;
// Keep the compiler happy
return (void *)0;
/* X r d C m s R R Q S l o t C l a s s M e t h o d s */
/* C o n s t r u c t o r */
XrdCmsRRQSlot::XrdCmsRRQSlot() : Link(this)
slotNum = initSlot++;
if (slotNum)
{Cont = freeSlot;
freeSlot = this;
} else Cont = 0;
Arg1 = Arg2 = 0;
Info.Key = 0;
/* A l l o c */
XrdCmsRRQSlot *XrdCmsRRQSlot::Alloc(XrdCmsRRQInfo *theInfo)
XrdCmsRRQSlot *sp;
if ((sp = freeSlot))
{sp->Info = *theInfo;
freeSlot = sp->Cont;
sp->Cont = 0;
sp->LkUp = 0;
sp->Arg1 = 0;
sp->Arg2 = 0;
return sp;
/* R e c y c l e */
void XrdCmsRRQSlot::Recycle()
XrdCmsRRQSlot *sp, *np;
if (!Link.Singleton()) Link.Remove();
// Remove items in the lookup chain first
np = LkUp;
while((sp = np))
{np = sp->LkUp;
sp->Cont = freeSlot;
freeSlot = sp;
sp->Info.Key = 0;
// Now remove items in the select chain
np = Cont;
while((sp = np))
{np = sp->Cont;
sp->Cont = freeSlot;
freeSlot = sp;
sp->Info.Key = 0;
// Now put this item in the free chain
Info.Key = 0;
Cont = freeSlot;
freeSlot = this;