/******************************************************************************/
/* */
/* X r d C m s A d m i n . c c */
/* */
/* (c) 2007 by the Board of Trustees of the Leland Stanford, Jr., University */
/* All Rights Reserved */
/* Produced by Andrew Hanushevsky for Stanford University under contract */
/* DE-AC02-76-SFO0515 with the Department of Energy */
/* */
/* This file is part of the XRootD software suite. */
/* */
/* XRootD is free software: you can redistribute it and/or modify it under */
/* the terms of the GNU Lesser General Public License as published by the */
/* Free Software Foundation, either version 3 of the License, or (at your */
/* option) any later version. */
/* */
/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
/* License for more details. */
/* */
/* You should have received a copy of the GNU Lesser General Public License */
/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
/* COPYING (GPL license). If not, see . */
/* */
/* The copyright holder's institutional names and contributor's names may not */
/* be used to endorse or promote products derived from this software without */
/* specific prior written permission of the institution or contributor. */
/******************************************************************************/
#include
#include
#include
#include
#include
#include
#include
#include "XProtocol/XProtocol.hh"
#include "XProtocol/YProtocol.hh"
#include "XrdCms/XrdCmsAdmin.hh"
#include "XrdCms/XrdCmsConfig.hh"
#include "XrdCms/XrdCmsManager.hh"
#include "XrdCms/XrdCmsPrepare.hh"
#include "XrdCms/XrdCmsState.hh"
#include "XrdCms/XrdCmsTrace.hh"
#include "XrdNet/XrdNetSocket.hh"
#include "XrdOuc/XrdOuca2x.hh"
#include "XrdOuc/XrdOucName2Name.hh"
#include "XrdOuc/XrdOucTList.hh"
#include "XrdSys/XrdSysError.hh"
#include "XrdSys/XrdSysPlatform.hh"
#include "XrdSys/XrdSysTimer.hh"
using namespace XrdCms;
/******************************************************************************/
/* L o c a l C l a s s e s */
/******************************************************************************/
namespace XrdCms
{
class AdminReq
{
public:
AdminReq *Next;
const char *Req;
const char *Path;
CmsRRHdr Hdr;
char *Data;
int Dlen;
static int numinQ;
static const int maxinQ = 1024;
static AdminReq *getReq() {AdminReq *arP;
do {QPresent.Wait();
QMutex.Lock();
if ((arP = First))
{if (!(First = arP->Next)) Last = 0;
numinQ--;
}
QMutex.UnLock();
} while (!arP);
return arP;
}
void Requeue() {QMutex.Lock();
Next=First; First=this; QPresent.Post(); numinQ++;
QMutex.UnLock();
}
AdminReq(const char *req, XrdCmsRRData &RRD)
: Next(0), Req(req), Path(RRD.Path ? RRD.Path : ""),
Hdr(RRD.Request), Data(RRD.Buff), Dlen(RRD.Dlen)
{RRD.Buff = 0;
QMutex.Lock();
if (Last) {Last->Next = this; Last = this;}
else First=Last = this;
QPresent.Post();
numinQ++;
QMutex.UnLock();
}
~AdminReq() {if (Data) free(Data);}
private:
static XrdSysSemaphore QPresent;
static XrdSysMutex QMutex;
static AdminReq *First;
static AdminReq *Last;
};
};
/******************************************************************************/
/* G l o b a l s & S t a t i c s */
/******************************************************************************/
XrdSysSemaphore AdminReq::QPresent(0);
XrdSysMutex AdminReq::QMutex;
AdminReq *AdminReq::First = 0;
AdminReq *AdminReq::Last = 0;
int AdminReq::numinQ= 0;
XrdOssStatInfo2_t XrdCmsAdmin::areFunc = 0;
XrdOucTList *XrdCmsAdmin::areFirst = 0;
XrdOucTList *XrdCmsAdmin::areLast = 0;
XrdSysMutex XrdCmsAdmin::areMutex;
XrdSysSemaphore XrdCmsAdmin::areSem(0);
bool XrdCmsAdmin::arePost = false;
XrdSysMutex XrdCmsAdmin::myMutex;
XrdSysSemaphore *XrdCmsAdmin::SyncUp = 0;
int XrdCmsAdmin::POnline= 0;
/******************************************************************************/
/* E x t e r n a l T h r e a d I n t e r f a c e s */
/******************************************************************************/
void *XrdCmsAdminLogin(void *carg)
{XrdCmsAdmin *Admin = new XrdCmsAdmin();
Admin->Login(*(int *)carg);
delete Admin;
return (void *)0;
}
void *XrdCmsAdminMonAds(void *carg)
{XrdCmsAdmin *Admin = (XrdCmsAdmin *)carg;
Admin->MonAds();
return (void *)0;
}
void *XrdCmsAdminMonARE(void *carg)
{XrdCmsAdmin::RelayAREvent();
return (void *)0;
}
void *XrdCmsAdminSend(void *carg)
{XrdCmsAdmin::Relay(0,0);
return (void *)0;
}
/******************************************************************************/
/* I n i t A R E v e n t s */
/******************************************************************************/
bool XrdCmsAdmin::InitAREvents(void *arFunc)
{
pthread_t tid;
// Record the function we will be using
//
areFunc = (XrdOssStatInfo2_t)arFunc;
// Start the event relay
//
if (XrdSysThread::Run(&tid,XrdCmsAdminMonARE,(void *)0))
{Say.Emsg("InitAREvents", errno, "start arevent relay");
return false;
}
// All done
//
return true;
}
/******************************************************************************/
/* L o g i n */
/******************************************************************************/
void XrdCmsAdmin::Login(int socknum)
{
const char *epname = "Admin_Login";
const char *sMsg[2] = {"temporary suspend requested by",
"long-term suspend requested by"};
char *request, *tp;
int sPerm;
// Attach the socket FD to a stream
//
Stream.Attach(socknum);
// The first request better be "login"
//
if ((request = Stream.GetLine()))
{DEBUG("initial request: '" < 0);
} while(rc < 0 && errno == EINTR);
if (rc < 0) Say.Emsg(epname, errno, "maintain contact with", pname);
else Say.Emsg(epname,"Lost contact with", pname);
CmsState.Update(XrdCmsState::FrontEnd, 0, -1);
close(sFD);
XrdSysTimer::Snooze(15);
} while(1);
}
/******************************************************************************/
/* N o t e s */
/******************************************************************************/
void *XrdCmsAdmin::Notes(XrdNetSocket *AnoteSock)
{
const char *epname = "Notes";
char *request, *tp;
int rc;
// Bind the udp socket to a stream
//
Stream.Attach(AnoteSock->Detach());
Sname = strdup("anon");
// Accept notifications in an endless loop
//
do {while((request = Stream.GetLine()))
{DEBUG("received notification: '" <= 0) close(curSock);
else if (newSock >= 0) SReady.Post();
if (newSock < 0) curSock = -1;
else {curSock = dup(newSock); XrdNetSocket::setOpts(curSock, 0);}
SMutex.UnLock();
return;
}
// This is just an endless loop
//
do {while(mySock < 0)
{SMutex.Lock();
if (curSock < 0) {SMutex.UnLock(); SReady.Wait(); SMutex.Lock();}
mySock = curSock; curSock = -1;
SMutex.UnLock();
}
do {arP = AdminReq::getReq();
if ((retc = write(mySock, &arP->Hdr, HdrSz)) != HdrSz
|| (retc = write(mySock, arP->Data, arP->Dlen)) != arP->Dlen)
retc = (retc < 0 ? errno : ECANCELED);
else {DEBUG("sent " <Req <<' ' <Path);
delete arP; retc = 0;
}
} while(retc == 0);
if (retc) Say.Emsg("AdminRelay", retc, "relay", arP->Req);
arP->Requeue();
close(mySock);
mySock = -1;
} while(1);
}
/******************************************************************************/
/* R e l a y A R E v e n t */
/******************************************************************************/
void XrdCmsAdmin::RelayAREvent()
{
EPNAME("RelayAREvent");
const char *evWhat;
XrdOucTList *evP;
int evType, mod;
// Endless loop relaying events
//
do{areMutex.Lock();
while((evP = areFirst))
{if (evP == areLast) areFirst = areLast = 0;
else areFirst = evP->next;
areMutex.UnLock();
XrdCms::CmsReqCode reqCode = static_cast(evP->ival[0]);
mod = evP->ival[1];
if (reqCode == kYR_have)
{if (mod & CmsHaveRequest::Pending)
{evType = XrdOssStatEvent::PendAdded;
evWhat = "pend ";
} else {
evType = XrdOssStatEvent::FileAdded;
evWhat = "have ";
}
} else {
evType = XrdOssStatEvent::FileRemoved;
evWhat = "gone ";
}
(*areFunc)(evP->text, 0, evType, 0, evP->text);
DEBUG("sending managers " <text);
XrdCmsManager::Inform(reqCode, mod, evP->text, strlen(evP->text)+1);
delete evP;
areMutex.Lock();
}
arePost = true;
areMutex.UnLock();
areSem.Wait();
} while(true);
}
/******************************************************************************/
/* S e n d */
/******************************************************************************/
void XrdCmsAdmin::Send(const char *Req, XrdCmsRRData &Data)
{
// AdminReq *arP;
if (AdminReq::numinQ < AdminReq::maxinQ) new AdminReq(Req, Data);
else Say.Emsg("Send", "Queue full; ignoring", Req, Data.Path);
}
/******************************************************************************/
/* S t a r t */
/******************************************************************************/
void *XrdCmsAdmin::Start(XrdNetSocket *AdminSock)
{
const char *epname = "Start";
int InSock;
pthread_t tid;
// Start the relay thread
//
if (XrdSysThread::Run(&tid,XrdCmsAdminSend,(void *)0))
Say.Emsg(epname, errno, "start admin relay");
// If we are in independent mode then let the caller continue
//
if (Config.doWait)
{if (Config.adsPort) BegAds();
else Say.Emsg(epname, "Waiting for primary server to login.");
}
else if (SyncUp) {SyncUp->Post(); SyncUp = 0;}
// Accept connections in an endless loop
//
while(1) if ((InSock = AdminSock->Accept()) >= 0)
{XrdNetSocket::setOpts(InSock, 0);
if (XrdSysThread::Run(&tid,XrdCmsAdminLogin,(void *)&InSock))
{Say.Emsg(epname, errno, "start admin");
close(InSock);
}
} else Say.Emsg(epname, errno, "accept connection");
return (void *)0;
}
/******************************************************************************/
/* P r i v a t e M e t h o d s */
/******************************************************************************/
/******************************************************************************/
/* A d d E v e n t */
/******************************************************************************/
void XrdCmsAdmin::AddEvent(const char *path, XrdCms::CmsReqCode req, int mods)
{
int info[2] = {(int)req, mods};
XrdOucTList *evP = new XrdOucTList(path, info);
// Add the event to he queue
//
areMutex.Lock();
if (areLast) areLast->next = evP;
else areFirst = evP;
areLast = evP;
if (arePost) {areSem.Post(); arePost = false;}
areMutex.UnLock();
}
/******************************************************************************/
/* B e g A d s */
/******************************************************************************/
void XrdCmsAdmin::BegAds()
{
const char *epname = "BegAds";
pthread_t tid;
// If we don't need to monitor he alternate data server then we are all set
//
if (!Config.adsMon)
{Say.Emsg(epname, "Assuming alternate data server is functional.");
CmsState.Update(XrdCmsState::FrontEnd, 1, Config.adsPort);
if (SyncUp) {SyncUp->Post(); SyncUp = 0;}
return;
}
// Start the connection/ping thread for the alternate data server
//
if (XrdSysThread::Run(&tid,XrdCmsAdminMonAds,(void *)this))
Say.Emsg(epname, errno, "start alternate data server monitor");
}
/******************************************************************************/
/* C h e c k V N i d */
/******************************************************************************/
bool XrdCmsAdmin::CheckVNid(const char *xNid)
{
// Check if we have a vnid but the server is supplying one or is not the same
//
if (Config.myVNID)
{if (!xNid)
{Say.Emsg("do_Login", "Warning! No xrootd vnid specified; "
"proceeding only with cmsd vnid.");
return true;
}
if (!strcmp(xNid, Config.myVNID)) return true;
std::string msg("xrootd vnid '");
msg += xNid; msg += "' does not match cmsd vnid '";
msg += Config.myVNID; msg += "'.";
Say.Emsg("do_Login", msg.c_str());
return false;
}
// We don't have a vnid, check if one is present
//
if (xNid) Say.Emsg("do_Login", "Warning! xrootd has a vnid but cmsd does "
"not; proceeding without a vnid!");
return true;
}
/******************************************************************************/
/* C o n 2 A d s */
/******************************************************************************/
int XrdCmsAdmin::Con2Ads(const char *pname)
{
const char *epname = "Con2Ads";
static ClientInitHandShake hsVal = {0, 0, 0, (int)htonl(4), (int)htonl(2012)};
static ClientLoginRequest loginReq = {{0, 0},
(kXR_unt16)htons(kXR_login),
(kXR_int32)htonl(getpid()),
{'c', 'm', 's', 'd', 0, 0, 0, 0},
0, 0, {0}, {0}, 0};
struct {kXR_int32 siHS[4];} hsRsp;
XrdNetSocket adsSocket;
int ecode, snum;
char ecnt = 10;
// Create a socket and to connect to the alternate data server
//
do{while((snum = adsSocket.Open("localhost", Config.adsPort)) < 0)
{if (ecnt >= 10)
{ecode = adsSocket.LastError();
Say.Emsg(epname, ecode, "connect to", pname);
ecnt = 1;
} else ecnt++;
XrdSysTimer::Snooze(3);
}
// Write the handshake to make sure the connection went fine
//
if (write(snum, &hsVal, sizeof(hsVal)) < 0)
{Say.Emsg(epname, errno, "send handshake to", pname);
close(snum); continue;
}
// Read the mandatory response
//
if (recv(snum, &hsRsp, sizeof(hsRsp), MSG_WAITALL) < 0)
{Say.Emsg(epname, errno, "recv handshake from", pname);
close(snum); continue;
}
// Now we need to send the login request
//
if (write(snum, &loginReq, sizeof(loginReq)) < 0)
{Say.Emsg(epname, errno, "send login to", pname);
close(snum); continue;
} else break;
} while(1);
// Indicate what we just did
//
Say.Emsg(epname, "Logged into", pname);
// We connected, so we indicate that the alternate is ok
//
myMutex.Lock();
CmsState.Update(XrdCmsState::FrontEnd, 1, Config.adsPort);
if (SyncUp) {SyncUp->Post(); SyncUp = 0;}
myMutex.UnLock();
// All done
//
return adsSocket.Detach();
}
/******************************************************************************/
/* d o _ L o g i n */
/******************************************************************************/
int XrdCmsAdmin::do_Login()
{
std::string vnidVal;
const char *emsg;
char buff[64], *tp, Ltype = 0;
int Port = 0;
// Process: login {p | P | s | u} [port ] [nid ]
//
if (!(tp = Stream.GetToken()))
{Say.Emsg("do_Login", "login type not specified");
return 0;
}
Ltype = *tp;
if (*(tp+1) == '\0')
switch (*tp)
{case 'p': Stype = "Primary server"; break;
case 'P': Stype = "Proxy server"; break;
case 's': Stype = "Server"; break;
case 'u': Stype = "Admin"; break;
default: Ltype = 0; break;
}
if (!Ltype)
{Say.Emsg("do_Login", "Invalid login type,", tp);
return 0;
} else Ltype = *tp;
if (Config.adsPort && Ltype != 'u')
{Say.Emsg("do_login", Stype, " login rejected; configured for an "
"alternate data server.");
return 0;
}
if (!(tp = Stream.GetToken()))
{Say.Emsg("do_Login", "login name not specified");
return 0;
} else Sname = strdup(tp);
// Get any additional options
//
while((tp = Stream.GetToken()))
{ if (!strcmp(tp, "port"))
{if (!(tp = Stream.GetToken()))
{Say.Emsg("do_Login", "login port not specified");
return 0;
}
if (XrdOuca2x::a2i(Say,"login port",tp,&Port,0))
return 0;
}
else if (!strcmp(tp, "vnid"))
{if (!(tp = Stream.GetToken()))
{Say.Emsg("do_Login", "vnid value not specified");
return 0;
}
vnidVal = tp;
}
else {Say.Emsg("do_Login", "invalid login option -", tp);
return 0;
}
}
// If this is not a primary, we are done. Otherwise there is much more. We
// must make sure we are compatible with the login. Note that for alternate
// data servers we already screened out primary logins, so we will return.
//
if (Ltype != 'p' && Ltype != 'P') return 1;
if (Ltype == 'p' && Config.asProxy()) emsg = "only accepts proxies";
else if (Ltype == 'P' && !Config.asProxy()) emsg = "does not accept proxies";
else emsg = 0;
if (emsg)
{Say.Emsg("do_login", "Server login rejected; configured role", emsg);
return 0;
}
// Verify virtual networking
//
if ((vnidVal.length() || Config.myVNID)
&& !CheckVNid(vnidVal.length() ? vnidVal.c_str() : 0))
{Say.Emsg("do_login", "Server login rejected; virtual networking error.");
return 0;
}
// Discard login if this is a duplicate primary server
//
myMutex.Lock();
if (POnline)
{myMutex.UnLock();
Say.Emsg("do_Login", "Primary server already logged in; login of",
tp, "rejected.");
return 0;
}
// Indicate we have a primary
//
Primary = 1;
POnline = 1;
Relay(1, Stream.FDNum());
CmsState.Update(XrdCmsState::FrontEnd, 1, Port);
// Check if this is the first primary login and resume if we must
//
if (SyncUp) {SyncUp->Post(); SyncUp = 0;}
myMutex.UnLock();
// Document the login
//
sprintf(buff, "logged in; data port is %d", Port);
Say.Emsg("do_Login:", Stype, Sname, buff);
return 1;
}
/******************************************************************************/
/* d o _ R m D i d */
/******************************************************************************/
void XrdCmsAdmin::do_RmDid(int isPfn)
{
const char *epname = "do_RmDid";
char *tp, *thePath, apath[XrdCmsMAX_PATH_LEN];
int rc;
if (!(tp = Stream.GetToken()))
{Say.Emsg(epname,"removed path not specified by",Stype,Sname);
return;
}
// Handle prepare queue removal
//
if (PrepQ.isOK())
{if (!isPfn && Config.lcl_N2N)
if ((rc = Config.lcl_N2N->lfn2pfn(tp, apath, sizeof(apath))))
{Say.Emsg(epname, rc, "determine pfn for removed path", tp);
thePath = 0;
} else thePath = apath;
else thePath = tp;
if (thePath) PrepQ.Gone(thePath);
}
// If we have a pfn then we must get the lfn to inform our manager about the file
//
if (isPfn && Config.lcl_N2N)
{if ((rc = Config.lcl_N2N->pfn2lfn(tp, apath, sizeof(apath))))
{Say.Emsg(epname, rc, "determine lfn for removed path", tp);
return;
} else tp = apath;
}
// Check if we are relaying remove events and, if so, vector through that.
//
if (areFunc) AddEvent(tp, kYR_gone, kYR_raw);
else {DEBUG("sending managers gone " <pfn2lfn(tp, apath, sizeof(apath))))
{Say.Emsg(epname, rc, "determine lfn for added path", tp);
return;
} else tp = apath;
}
// Check if we are relaying remove events and, if so, vector through that.
//
if (areFunc) AddEvent(tp, kYR_have, Mods);
else {DEBUG("sending managers have online " <