/******************************************************************************/ /* */ /* X r d C m s S t a t e . c c */ /* */ /* (c) 2007 by the Board of Trustees of the Leland Stanford, Jr., University */ /* All Rights Reserved */ /* Produced by Andrew Hanushevsky for Stanford University under contract */ /* DE-AC02-76-SFO0515 with the Department of Energy */ /* */ /* This file is part of the XRootD software suite. */ /* */ /* XRootD is free software: you can redistribute it and/or modify it under */ /* the terms of the GNU Lesser General Public License as published by the */ /* Free Software Foundation, either version 3 of the License, or (at your */ /* option) any later version. */ /* */ /* XRootD is distributed in the hope that it will be useful, but WITHOUT */ /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */ /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */ /* License for more details. */ /* */ /* You should have received a copy of the GNU Lesser General Public License */ /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */ /* COPYING (GPL license). If not, see . */ /* */ /* The copyright holder's institutional names and contributor's names may not */ /* be used to endorse or promote products derived from this software without */ /* specific prior written permission of the institution or contributor. */ /******************************************************************************/ #include #include #include #include #include #include #include "XProtocol/YProtocol.hh" #include "Xrd/XrdLink.hh" #include "XrdCms/XrdCmsManager.hh" #include "XrdCms/XrdCmsRTable.hh" #include "XrdCms/XrdCmsState.hh" #include "XrdCms/XrdCmsTrace.hh" #include "XrdSys/XrdSysError.hh" using namespace XrdCms; /******************************************************************************/ /* G l o b a l s */ /******************************************************************************/ XrdCmsState XrdCms::CmsState; /******************************************************************************/ /* C o n s t r u c t o r */ /******************************************************************************/ XrdCmsState::XrdCmsState() : mySemaphore(0) { minNodeCnt = 1; numActive = 0; numStaging = 0; currState = All_NoStage | All_Suspend; prevState = 0; Suspended = All_Suspend; NoStaging = All_NoStage; feOK = 0; noSpace = 0; adminNoStage = 0; adminSuspend = 0; NoStageFile = ""; SuspendFile = ""; isMan = 0; dataPort = 0; Enabled = 0; } /******************************************************************************/ /* Punlic E n a b l e */ /******************************************************************************/ void XrdCmsState::Enable() { struct stat buff; // Set correct admin staging state // Update(Stage, stat(NoStageFile, &buff)); // Set correct admin suspend state // Update(Active, stat(SuspendFile, &buff)); // We will force the information to be sent to interested parties by making // the previous state different from the current state and enabling ourselves. // myMutex.Lock(); Enabled = 1; prevState = ~currState; mySemaphore.Post(); myMutex.UnLock(); } /******************************************************************************/ /* Public M o n i t o r */ /******************************************************************************/ void *XrdCmsState::Monitor() { CmsStatusRequest myStatus = {{0, kYR_status, 0, 0}}; int RTsend, theState, Changes, myPort; // Do this forever (we are only posted when finally enabled) // do {mySemaphore.Wait(); myMutex.Lock(); Changes = currState ^ prevState; theState = currState; prevState = currState; myPort = dataPort; myMutex.UnLock(); if (Changes && (myStatus.Hdr.modifier = Status(Changes, theState))) {if (myStatus.Hdr.modifier & CmsStatusRequest::kYR_Resume) {myStatus.Hdr.streamid = htonl(myPort); RTsend = 1;} else {myStatus.Hdr.streamid = 0; RTsend = (isMan > 0 ? (theState & SRV_Suspend) : 0); } if (isMan && RTsend) RTable.Send("status", (char *)&myStatus, sizeof(myStatus)); XrdCmsManager::Inform(myStatus.Hdr); } } while(1); // All done // return (void *)0; } /******************************************************************************/ /* Public P o r t */ /******************************************************************************/ int XrdCmsState::Port() { int xPort; myMutex.Lock(); xPort = dataPort; myMutex.UnLock(); return xPort; } /******************************************************************************/ /* Public s e n d S t a t e */ /******************************************************************************/ void XrdCmsState::sendState(XrdLink *lp) { CmsStatusRequest myStatus = {{0, kYR_status, 0, 0}}; myMutex.Lock(); myStatus.Hdr.modifier = Suspended ? CmsStatusRequest::kYR_Suspend : CmsStatusRequest::kYR_Resume; myStatus.Hdr.modifier |= NoStaging ? CmsStatusRequest::kYR_noStage : CmsStatusRequest::kYR_Stage; lp->Send((char *)&myStatus.Hdr, sizeof(myStatus.Hdr)); myMutex.UnLock(); } /******************************************************************************/ /* Public S e t */ /******************************************************************************/ void XrdCmsState::Set(int ncount) { // Set the node count (this requires a lock) // myMutex.Lock(); minNodeCnt = ncount; myMutex.UnLock(); } /******************************************************************************/ void XrdCmsState::Set(int ncount, int isman, const char *AdminPath) { char fnbuff[1048]; int i; // This is a configuration call no locks are required. // minNodeCnt = ncount; isMan = isman; i = strlen(AdminPath); strcpy(fnbuff, AdminPath); if (AdminPath[i-1] != '/') fnbuff[i++] = '/'; strcpy(fnbuff+i, "NOSTAGE"); NoStageFile = strdup(fnbuff); strcpy(fnbuff+i, "SUSPEND"); SuspendFile = strdup(fnbuff); } /******************************************************************************/ /* Private S t a t u s */ /******************************************************************************/ unsigned char XrdCmsState::Status(int Changes, int theState) { const char *SRstate = 0, *SNstate = 0; unsigned char rrModifier; // Check for suspend changes // if (Changes & All_Suspend) if (theState & All_Suspend) {rrModifier = CmsStatusRequest::kYR_Suspend; SRstate = "suspended"; } else { rrModifier = CmsStatusRequest::kYR_Resume; SRstate = "active"; } else rrModifier = 0; // Check for staging changes // if (Changes & All_NoStage) {if (theState & All_NoStage) {rrModifier |= CmsStatusRequest::kYR_noStage; SNstate = "+ nostaging"; } else { rrModifier |= CmsStatusRequest::kYR_Stage; SNstate = "+ staging"; } } // Report and return status // if (rrModifier) {if (!SRstate && SNstate) SNstate += 2; Say.Emsg("State", "Status changed to", SRstate, SNstate); } return rrModifier; } /******************************************************************************/ /* Public U p d a t e */ /******************************************************************************/ void XrdCmsState::Update(StateType StateT, int ActivCnt, int StageCnt) { EPNAME("Update"); const char *What; char newVal; // Create new state // myMutex.Lock(); switch(StateT) {case Active: if ((newVal = ActivCnt ? 0 : 1) != adminSuspend) { if ( newVal && !StageCnt) unlink(SuspendFile); else if (!newVal || !StageCnt) unlink(SuspendFile); else close(open(SuspendFile, O_WRONLY|O_CREAT, S_IRUSR|S_IWUSR)); adminSuspend = newVal; } What = "Active"; break; case Counts: numStaging += StageCnt; numActive += ActivCnt; What = "Counts"; break; case FrontEnd: if ((feOK = (ActivCnt ? 1 : 0)) && StageCnt >= 0) dataPort = StageCnt; What = "FrontEnd"; break; case Space: noSpace = (ActivCnt ? 0 : 1); What = "Space"; break; case Stage: if ((newVal = ActivCnt ? 0 : 1) != adminNoStage) {if (newVal) unlink(NoStageFile); else close(open(NoStageFile, O_WRONLY|O_CREAT, S_IRUSR|S_IWUSR)); adminNoStage = newVal; } What = "Stage"; break; default: Say.Emsg("State", "Invalid state update"); What = "Unknown"; break; } DEBUG(What <<" Parm1=" <