/******************************************************************************/
/* */
/* X r d F r m X f r A g e n t . c c */
/* */
/* (c) 2010 by the Board of Trustees of the Leland Stanford, Jr., University */
/* All Rights Reserved */
/* Produced by Andrew Hanushevsky for Stanford University under contract */
/* DE-AC02-76-SFO0515 with the Department of Energy */
/* */
/* This file is part of the XRootD software suite. */
/* */
/* XRootD is free software: you can redistribute it and/or modify it under */
/* the terms of the GNU Lesser General Public License as published by the */
/* Free Software Foundation, either version 3 of the License, or (at your */
/* option) any later version. */
/* */
/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
/* License for more details. */
/* */
/* You should have received a copy of the GNU Lesser General Public License */
/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
/* COPYING (GPL license). If not, see . */
/* */
/* The copyright holder's institutional names and contributor's names may not */
/* be used to endorse or promote products derived from this software without */
/* specific prior written permission of the institution or contributor. */
/******************************************************************************/
#include
#include
#include
#include
#include
#include
#include
#include "XrdFrc/XrdFrcRequest.hh"
#include "XrdFrc/XrdFrcTrace.hh"
#include "XrdFrc/XrdFrcUtils.hh"
#include "XrdFrm/XrdFrmConfig.hh"
#include "XrdFrm/XrdFrmXfrAgent.hh"
#include "XrdOuc/XrdOucStream.hh"
#include "XrdSys/XrdSysPlatform.hh"
using namespace XrdFrc;
using namespace XrdFrm;
/******************************************************************************/
/* S t a t i c V a r i a b l e s */
/******************************************************************************/
XrdFrcReqAgent XrdFrmXfrAgent::GetAgent("getf", XrdFrcRequest::getQ);
XrdFrcReqAgent XrdFrmXfrAgent::MigAgent("migr", XrdFrcRequest::migQ);
XrdFrcReqAgent XrdFrmXfrAgent::StgAgent("pstg", XrdFrcRequest::stgQ);
XrdFrcReqAgent XrdFrmXfrAgent::PutAgent("putf", XrdFrcRequest::putQ);
/******************************************************************************/
/* Private: A d d */
/******************************************************************************/
void XrdFrmXfrAgent::Add(XrdOucStream &Request, char *Tok,
XrdFrcReqAgent &Server)
{
XrdFrcRequest myReq;
const char *Miss = 0;
char *tp, *op;
// Handle: op[] [. . .]
//
// op: + | & | ^ | < | = | >
//
memset(&myReq, 0, sizeof(myReq));
myReq.OPc = *Tok;
if (*Tok == '=' || *Tok == '^') myReq.Options |= XrdFrcRequest::Purge;
Tok++;
if (*Tok) strlcpy(myReq.User, Tok, sizeof(myReq.User));
else strlcpy(myReq.User, Config.myProg, sizeof(myReq.User));
if (!(tp = Request.GetToken())) Miss = "request id";
else strlcpy(myReq.ID, tp, sizeof(myReq.ID));
if (!Miss)
{if (!(tp = Request.GetToken())) Miss = "notify path";
else strlcpy(myReq.Notify, tp, sizeof(myReq.Notify));
}
if (!Miss)
{if (!(tp = Request.GetToken())) Miss = "priority";
else {myReq.Prty = atoi(tp);
if (myReq.Prty < 0) myReq.Prty = 0;
else if (myReq.Prty > XrdFrcRequest::maxPrty)
myReq.Prty = XrdFrcRequest::maxPrty;
}
}
if (!Miss)
{if (!(tp = Request.GetToken())) Miss = "mode";
else myReq.Options |= XrdFrcUtils::MapM2O(myReq.Notify, tp);
}
if (!Miss && !(tp = Request.GetToken())) Miss = "path";
// Check for any errors
//
if (Miss) {Say.Emsg("Agent_Add", Miss, "missing in '+' request.");
return;
}
// Add all paths in the request
//
do {strlcpy(myReq.LFN, tp, sizeof(myReq.LFN));
if ((op = index(tp, '?'))) {myReq.Opaque = op-tp+1; *op = '\0';}
else myReq.Opaque = 0;
myReq.LFO = 0;
if (myReq.LFN[0] != '/' && !(myReq.LFO = XrdFrcUtils::chkURL(myReq.LFN)))
Say.Emsg("Agent_Add", "Invalid url -", myReq.LFN);
else Server.Add(myReq);
if ((tp = Request.GetToken())) memset(myReq.LFN, 0, sizeof(myReq.LFN));
} while(tp);
}
/******************************************************************************/
/* Private: A g e n t */
/******************************************************************************/
XrdFrcReqAgent *XrdFrmXfrAgent::Agent(char bType)
{
// Return the agent corresponding to the type
//
switch(bType)
{case 0 : return &StgAgent;
case '+': return &StgAgent;
case '^':
case '&': return &MigAgent;
case '<': return &GetAgent;
case '=':
case '>': return &PutAgent;
default: break;
}
return 0;
}
/******************************************************************************/
/* Private: D e l */
/******************************************************************************/
void XrdFrmXfrAgent::Del(XrdOucStream &Request, char *Tok,
XrdFrcReqAgent &Server)
{
XrdFrcRequest myReq;
// If the requestid is adjacent to the operation, use it o/w get it
//
if (!(*Tok) && (!(Tok = Request.GetToken()) || !(*Tok)))
{Say.Emsg("Del", "request id missing in cancel request.");
return;
}
// Copy the request ID into the request and remove it from peer server
//
memset(&myReq, 0, sizeof(myReq));
strlcpy(myReq.ID, Tok, sizeof(myReq.ID));
Server.Del(myReq);
}
/******************************************************************************/
/* Private: L i s t */
/******************************************************************************/
void XrdFrmXfrAgent::List(XrdOucStream &Request, char *Tok)
{
XrdFrcRequest::Item Items[XrdFrcRequest::getLast];
XrdFrcReqAgent *agentP;
int n = 0;
char *tp;
while((tp = Request.GetToken()) && n < XrdFrcRequest::getLast)
{if (XrdFrcUtils::MapV2I(tp, Items[n])) n++;}
// List entries queued for specific servers
//
if (!(*Tok)) {StgAgent.List(Items, n); GetAgent.List(Items, n);}
else do {if ((agentP = Agent(*Tok))) agentP->List(Items, n);
} while(*(++Tok));
cout <] [. . .]
// Copy purge: =[] [. . .]
// Copy out: >[] [. . .]
// Migrate: &[] [. . .]
// Migr+Purge: ^[] [. . .]
// Stage: +[] [. . .]
// Cancel in: -
// Cancel out: ~
// List: ?[<][+][&][>]
// Wakeup: ![<][+][&][>]
//
if ((tp = Request.GetToken()))
switch(*tp)
{case '+': Add(Request, tp, StgAgent); break;
case '<': Add(Request, tp, GetAgent); break;
case '=':
case '>': Add(Request, tp, PutAgent); break;
case '&':
case '^': Add(Request, tp, MigAgent); break;
case '-': Del(Request, tp+1, StgAgent);
Del(Request, tp+1, GetAgent);
break;
case '~': Del(Request, tp+1, MigAgent);
Del(Request, tp+1, PutAgent);
break;
case '?': List(Request, tp+1); break;
case '!': GetAgent.Ping(tp); break;
default: Say.Emsg("Agent", "Invalid request, '", tp, "'.");
}
}
/******************************************************************************/
/* Public: S t a r t */
/******************************************************************************/
int XrdFrmXfrAgent::Start()
{
EPNAME("Agent");
XrdOucStream Request;
char *tp;
// Prepare our agents
//
if (!StgAgent.Start(Config.QPath, Config.AdminMode)
|| !MigAgent.Start(Config.QPath, Config.AdminMode)
|| !GetAgent.Start(Config.QPath, Config.AdminMode)
|| !PutAgent.Start(Config.QPath, Config.AdminMode)) return 2;
// Attach stdin to the Request stream
//
Request.Attach(STDIN_FILENO, 8*1024);
// Process all input
//
while((tp = Request.GetLine()))
{DEBUG ("Request: '" <