/******************************************************************************/
/* */
/* X r d F r m X f r D a e m o n . c c */
/* */
/* (c) 2010 by the Board of Trustees of the Leland Stanford, Jr., University */
/* All Rights Reserved */
/* Produced by Andrew Hanushevsky for Stanford University under contract */
/* DE-AC02-76-SFO0515 with the Department of Energy */
/* */
/* This file is part of the XRootD software suite. */
/* */
/* XRootD is free software: you can redistribute it and/or modify it under */
/* the terms of the GNU Lesser General Public License as published by the */
/* Free Software Foundation, either version 3 of the License, or (at your */
/* option) any later version. */
/* */
/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
/* License for more details. */
/* */
/* You should have received a copy of the GNU Lesser General Public License */
/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
/* COPYING (GPL license). If not, see . */
/* */
/* The copyright holder's institutional names and contributor's names may not */
/* be used to endorse or promote products derived from this software without */
/* specific prior written permission of the institution or contributor. */
/******************************************************************************/
#include
#include
#include
#include
#include
#include
#include
#include
#include "XrdFrc/XrdFrcRequest.hh"
#include "XrdFrc/XrdFrcTrace.hh"
#include "XrdFrc/XrdFrcUtils.hh"
#include "XrdFrm/XrdFrmConfig.hh"
#include "XrdFrm/XrdFrmMigrate.hh"
#include "XrdFrm/XrdFrmTransfer.hh"
#include "XrdFrm/XrdFrmXfrAgent.hh"
#include "XrdFrm/XrdFrmXfrDaemon.hh"
#include "XrdNet/XrdNetOpts.hh"
#include "XrdNet/XrdNetSocket.hh"
#include "XrdOuc/XrdOucStream.hh"
#include "XrdSys/XrdSysPthread.hh"
#include "XrdSys/XrdSysTimer.hh"
using namespace XrdFrc;
using namespace XrdFrm;
/******************************************************************************/
/* S t a t i c V a r i a b l e s */
/******************************************************************************/
XrdFrmReqBoss XrdFrmXfrDaemon::GetBoss("getf", XrdFrcRequest::getQ);
XrdFrmReqBoss XrdFrmXfrDaemon::MigBoss("migr", XrdFrcRequest::migQ);
XrdFrmReqBoss XrdFrmXfrDaemon::StgBoss("pstg", XrdFrcRequest::stgQ);
XrdFrmReqBoss XrdFrmXfrDaemon::PutBoss("putf", XrdFrcRequest::putQ);
/******************************************************************************/
/* Private: B o s s */
/******************************************************************************/
XrdFrmReqBoss *XrdFrmXfrDaemon::Boss(char bType)
{
// Return the boss corresponding to the type
//
switch(bType)
{case 0 :
case '+': return &StgBoss;
case '^':
case '&': return &MigBoss;
case '<': return &GetBoss;
case '=':
case '>': return &PutBoss;
default: break;
}
return 0;
}
/******************************************************************************/
/* Public: I n i t */
/******************************************************************************/
int XrdFrmXfrDaemon::Init()
{
char buff[80];
// Make sure we are the only daemon running
//
sprintf(buff, "%s/frm_xfrd.lock", Config.QPath);
if (!XrdFrcUtils::Unique(buff, Config.myProg)) return 0;
// Initiliaze the transfer processor (it need to be active now)
//
if (!XrdFrmTransfer::Init()) return 0;
// Fix up some values that might not make sense
//
if (Config.WaitMigr < Config.IdleHold) Config.WaitMigr = Config.IdleHold;
// Check if it makes any sense to migrate and, if so, initialize migration
//
if (Config.pathList)
{if (!Config.xfrOUT)
Say.Emsg("Config","Output copy command not specified; "
"auto-migration disabled!");
else XrdFrmMigrate::Migrate();
} else Say.Emsg("Config","No migratable paths; "
"auto-migration disabled!");
// Start the external interfaces
//
if (!StgBoss.Start(Config.QPath, Config.AdminMode)
|| !MigBoss.Start(Config.QPath, Config.AdminMode)
|| !GetBoss.Start(Config.QPath, Config.AdminMode)
|| !PutBoss.Start(Config.QPath, Config.AdminMode)) return 0;
// All done
//
return 1;
}
/******************************************************************************/
/* Public: P o n g */
/******************************************************************************/
void *XrdFrmXfrDaemonPong(void *parg)
{
XrdFrmXfrDaemon::Pong();
return (void *)0;
}
void XrdFrmXfrDaemon::Pong()
{
EPNAME("Pong");
static int udpFD = -1;
XrdOucStream Request(&Say);
XrdFrmReqBoss *bossP;
char *tp;
// Get a UDP socket for the server if we haven't already done so and start
// a thread to re-enter this code and wait for messages from an agent.
//
if (udpFD < 0)
{XrdNetSocket *udpSock;
pthread_t tid;
int retc;
if ((udpSock = XrdNetSocket::Create(&Say, Config.QPath,
"xfrd.udp", Config.AdminMode, XRDNET_UDPSOCKET)))
{udpFD = udpSock->Detach(); delete udpSock;
if ((retc = XrdSysThread::Run(&tid, XrdFrmXfrDaemonPong, (void *)0,
XRDSYSTHREAD_BIND, "Pong")))
Say.Emsg("main", retc, "create udp listner");
}
return;
}
// Hookup to the udp socket as a stream
//
Request.Attach(udpFD, 64*1024);
// Now simply get requests (see XrdFrmXfrDaemon for details). Here we screen
// out ping and list requests.
//
while((tp = Request.GetLine()))
{DEBUG(": '" <Wakeup(1);}
break;
default: XrdFrmXfrAgent::Process(Request);
}
}
// We should never get here (but....)
//
Say.Emsg("Server", "Lost udp connection!");
}
/******************************************************************************/
/* Public: S t a r t */
/******************************************************************************/
int XrdFrmXfrDaemon::Start()
{
// Start the ponger
//
Pong();
// Now start nudging
//
while(1)
{StgBoss.Wakeup(); GetBoss.Wakeup();
MigBoss.Wakeup(); PutBoss.Wakeup();
XrdSysTimer::Snooze(Config.WaitQChk);
}
// We should never get here
//
return 0;
}