// @(#)root/proofplayer:$Id$ // Author: Jan Iwaszkiewicz 11/12/06 /************************************************************************* * Copyright (C) 1995-2002, Rene Brun and Fons Rademakers. * * All rights reserved. * * * * For the licensing terms see $ROOTSYS/LICENSE. * * For the list of contributors see $ROOTSYS/README/CREDITS. * *************************************************************************/ /** \class TPacketizerAdaptive \ingroup proofkernel This packetizer is based on TPacketizer but uses different load-balancing algorithms and data structures. Two main improvements in the load-balancing strategy: - First one was to change the order in which the files are assigned to the computing nodes in such a way that network transfers are evenly distributed in the query time. Transfer of the remote files was often becoming a bottleneck at the end of a query. - The other improvement is the use of time-based packet size. We measure the processing rate of all the nodes and calculate the packet size, so that it takes certain amount of time. In this way packetizer prevents the situation where the query can't finish because of one slow node. The data structures: TFileStat, TFileNode and TSlaveStat are enriched + changed and TFileNode::Compare method is changed. */ #include "TPacketizerAdaptive.h" #include "Riostream.h" #include "TDSet.h" #include "TError.h" #include "TEnv.h" #include "TEntryList.h" #include "TEventList.h" #include "TMap.h" #include "TMessage.h" #include "TMonitor.h" #include "TNtupleD.h" #include "TObject.h" #include "TParameter.h" #include "TPerfStats.h" #include "TProofDebug.h" #include "TProof.h" #include "TProofServ.h" #include "TSlave.h" #include "TSocket.h" #include "TSortedList.h" #include "TUrl.h" #include "TClass.h" #include "TRandom.h" #include "TMath.h" #include "TObjString.h" #include "TList.h" // // The following three utility classes manage the state of the // work to be performed and the slaves involved in the process. // A list of TFileNode(s) describes the hosts with files, each // has a list of TFileStat(s) keeping the state for each TDSet // element (file). // // The list of TSlaveStat(s) keep track of the work (being) done // by each slave // //------------------------------------------------------------------------------ class TPacketizerAdaptive::TFileStat : public TObject { private: Bool_t fIsDone; // is this element processed TFileNode *fNode; // my FileNode TDSetElement *fElement; // location of the file and its range Long64_t fNextEntry; // cursor in the range, -1 when done // needs changing public: TFileStat(TFileNode *node, TDSetElement *elem, TList *file); Bool_t IsDone() const {return fIsDone;} Bool_t IsSortable() const { return kTRUE; } void SetDone() {fIsDone = kTRUE;} TFileNode *GetNode() const {return fNode;} TDSetElement *GetElement() const {return fElement;} Long64_t GetNextEntry() const {return fNextEntry;} void MoveNextEntry(Long64_t step) {fNextEntry += step;} // This method is used to keep a sorted list of remaining files to be processed Int_t Compare(const TObject* obj) const { // Return -1 if elem.entries < obj.elem.entries, 0 if elem.entries equal // and 1 if elem.entries < obj.elem.entries. const TFileStat *fst = dynamic_cast(obj); if (fst && GetElement() && fst->GetElement()) { Long64_t ent = GetElement()->GetNum(); Long64_t entfst = fst->GetElement()->GetNum(); if (ent > 0 && entfst > 0) { if (ent > entfst) { return 1; } else if (ent < entfst) { return -1; } else { return 0; } } } // No info: assume equal (no change in order) return 0; } void Print(Option_t * = 0) const { // Notify file name and entries Printf("TFileStat: %s %lld", fElement ? fElement->GetName() : "---", fElement ? fElement->GetNum() : -1); } }; TPacketizerAdaptive::TFileStat::TFileStat(TFileNode *node, TDSetElement *elem, TList *files) : fIsDone(kFALSE), fNode(node), fElement(elem), fNextEntry(elem->GetFirst()) { // Constructor: add to the global list if (files) files->Add(this); } //------------------------------------------------------------------------------ // a class describing a file node as a part of a session class TPacketizerAdaptive::TFileNode : public TObject { private: TString fNodeName; // FQDN of the node TList *fFiles; // TDSetElements (files) stored on this node TObject *fUnAllocFileNext; // cursor in fFiles TList *fActFiles; // files with work remaining TObject *fActFileNext; // cursor in fActFiles Int_t fMySlaveCnt; // number of slaves running on this node // (which can process remote files) Int_t fExtSlaveCnt; // number of external slaves processing // files on this node Int_t fRunSlaveCnt; // total number of slaves processing files // on this node Long64_t fProcessed; // number of events processed on this node Long64_t fEvents; // number of entries in files on this node Int_t fStrategy; // 0 means the classic and 1 (default) - the adaptive strategy TSortedList *fFilesToProcess; // Global list of files (TFileStat) to be processed (owned by TPacketizer) public: TFileNode(const char *name, Int_t strategy, TSortedList *files); ~TFileNode() { delete fFiles; delete fActFiles; } void IncMySlaveCnt() { fMySlaveCnt++; } Int_t GetMySlaveCnt() const { return fMySlaveCnt; } void IncExtSlaveCnt(const char *slave) { if (fNodeName != slave) fExtSlaveCnt++; } void DecExtSlaveCnt(const char *slave) { if (fNodeName != slave) fExtSlaveCnt--; R__ASSERT(fExtSlaveCnt >= 0); } Int_t GetSlaveCnt() const { return fMySlaveCnt + fExtSlaveCnt; } void IncRunSlaveCnt() { fRunSlaveCnt++; } void DecRunSlaveCnt() { fRunSlaveCnt--; R__ASSERT(fRunSlaveCnt >= 0); } Int_t GetRunSlaveCnt() const { return fRunSlaveCnt; } Int_t GetExtSlaveCnt() const { return fExtSlaveCnt; } Int_t GetNumberOfActiveFiles() const { return fActFiles->GetSize(); } Bool_t IsSortable() const { return kTRUE; } Int_t GetNumberOfFiles() { return fFiles->GetSize(); } void IncProcessed(Long64_t nEvents) { fProcessed += nEvents; } Long64_t GetProcessed() const { return fProcessed; } void DecreaseProcessed(Long64_t nEvents) { fProcessed -= nEvents; } // this method is used by Compare() it adds 1, so it returns a number that // would be true if one more slave is added. Long64_t GetEventsLeftPerSlave() const { return ((fEvents - fProcessed)/(fRunSlaveCnt + 1)); } void IncEvents(Long64_t nEvents) { fEvents += nEvents; } const char *GetName() const { return fNodeName.Data(); } Long64_t GetNEvents() const { return fEvents; } void Print(Option_t * = 0) const { TFileStat *fs = 0; TDSetElement *e = 0; Int_t nn = 0; Printf("++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++"); Printf("+++ TFileNode: %s +++", fNodeName.Data()); Printf("+++ Evts: %lld (total: %lld) ", fProcessed, fEvents); Printf("+++ Worker count: int:%d, ext: %d, tot:%d ", fMySlaveCnt, fExtSlaveCnt, fRunSlaveCnt); Printf("+++ Files: %d ", fFiles ? fFiles->GetSize() : 0); if (fFiles && fFiles->GetSize() > 0) { TIter nxf(fFiles); while ((fs = (TFileStat *) nxf())) { if ((e = fs->GetElement())) { Printf("+++ #%d: %s %lld - %lld (%lld) - next: %lld ", ++nn, e->GetName(), e->GetFirst(), e->GetFirst() + e->GetNum() - 1, e->GetNum(), fs->GetNextEntry()); } else { Printf("+++ #%d: no element! ", ++nn); } } } Printf("+++ Active files: %d ", fActFiles ? fActFiles->GetSize() : 0); if (fActFiles && fActFiles->GetSize() > 0) { TIter nxaf(fActFiles); while ((fs = (TFileStat *) nxaf())) { if ((e = fs->GetElement())) { Printf("+++ #%d: %s %lld - %lld (%lld) - next: %lld", ++nn, e->GetName(), e->GetFirst(), e->GetFirst() + e->GetNum() - 1, e->GetNum(), fs->GetNextEntry()); } else { Printf("+++ #%d: no element! ", ++nn); } } } Printf("++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++"); } void Add(TDSetElement *elem, Bool_t tolist) { TList *files = tolist ? (TList *)fFilesToProcess : (TList *)0; TFileStat *f = new TFileStat(this, elem, files); fFiles->Add(f); if (fUnAllocFileNext == 0) fUnAllocFileNext = fFiles->First(); } TFileStat *GetNextUnAlloc() { TObject *next = fUnAllocFileNext; if (next != 0) { // make file active fActFiles->Add(next); if (fActFileNext == 0) fActFileNext = fActFiles->First(); // move cursor fUnAllocFileNext = fFiles->After(fUnAllocFileNext); } return (TFileStat *) next; } TFileStat *GetNextActive() { TObject *next = fActFileNext; if (fActFileNext != 0) { fActFileNext = fActFiles->After(fActFileNext); if (fActFileNext == 0) fActFileNext = fActFiles->First(); } return (TFileStat *) next; } void RemoveActive(TFileStat *file) { if (fActFileNext == file) fActFileNext = fActFiles->After(file); fActFiles->Remove(file); if (fFilesToProcess) fFilesToProcess->Remove(file); if (fActFileNext == 0) fActFileNext = fActFiles->First(); } Int_t Compare(const TObject *other) const { // Must return -1 if this is smaller than obj, 0 if objects are equal // and 1 if this is larger than obj. // smaller means more needing a new worker. // Two cases are considered depending on // relation between harddrive speed and network bandwidth. const TFileNode *obj = dynamic_cast(other); if (!obj) { Error("Compare", "input is not a TPacketizer::TFileNode object"); return 0; } // how many more events it has than obj if (fStrategy == 1) { // The default adaptive strategy. Int_t myVal = GetRunSlaveCnt(); Int_t otherVal = obj->GetRunSlaveCnt(); if (myVal < otherVal) { return -1; } else if (myVal > otherVal) { return 1; } else { // if this has more events to process than obj if ((fEvents - fProcessed) > (obj->GetNEvents() - obj->GetProcessed())) { return -1; } else { return 1; } } } else { Int_t myVal = GetSlaveCnt(); Int_t otherVal = obj->GetSlaveCnt(); if (myVal < otherVal) { return -1; } else if (myVal > otherVal) { return 1; } else { return 0; } } } void Reset() { fUnAllocFileNext = fFiles->First(); fActFiles->Clear(); fActFileNext = 0; fExtSlaveCnt = 0; fMySlaveCnt = 0; fRunSlaveCnt = 0; } }; TPacketizerAdaptive::TFileNode::TFileNode(const char *name, Int_t strategy, TSortedList *files) : fNodeName(name), fFiles(new TList), fUnAllocFileNext(0), fActFiles(new TList), fActFileNext(0), fMySlaveCnt(0), fExtSlaveCnt(0), fRunSlaveCnt(0), fProcessed(0), fEvents(0), fStrategy(strategy), fFilesToProcess(files) { // Constructor fFiles->SetOwner(); fActFiles->SetOwner(kFALSE); } //------------------------------------------------------------------------------ class TPacketizerAdaptive::TSlaveStat : public TVirtualPacketizer::TVirtualSlaveStat { friend class TPacketizerAdaptive; private: TFileNode *fFileNode; // corresponding node or 0 TFileStat *fCurFile; // file currently being processed TDSetElement *fCurElem; // TDSetElement currently being processed Long64_t fCurProcessed; // events processed in the current file Float_t fCurProcTime; // proc time spent on the current file TList *fDSubSet; // packets processed by this worker public: TSlaveStat(TSlave *slave); ~TSlaveStat(); TFileNode *GetFileNode() const { return fFileNode; } Long64_t GetEntriesProcessed() const { return fStatus?fStatus->GetEntries():-1; } Double_t GetProcTime() const { return fStatus?fStatus->GetProcTime():-1; } TFileStat *GetCurFile() { return fCurFile; } void SetFileNode(TFileNode *node) { fFileNode = node; } void UpdateRates(TProofProgressStatus *st); Float_t GetAvgRate() { return fStatus->GetRate(); } Float_t GetCurRate() { return (fCurProcTime?fCurProcessed/fCurProcTime:0); } Int_t GetLocalEventsLeft() { return fFileNode?(fFileNode->GetEventsLeftPerSlave()):0; } TList *GetProcessedSubSet() { return fDSubSet; } TProofProgressStatus *GetProgressStatus() { return fStatus; } TProofProgressStatus *AddProcessed(TProofProgressStatus *st = 0); }; //////////////////////////////////////////////////////////////////////////////// /// Constructor TPacketizerAdaptive::TSlaveStat::TSlaveStat(TSlave *slave) : fFileNode(0), fCurFile(0), fCurElem(0), fCurProcessed(0), fCurProcTime(0) { fDSubSet = new TList(); fDSubSet->SetOwner(); fSlave = slave; fStatus = new TProofProgressStatus(); // The slave name is a special one in PROOF-Lite: avoid blocking on the DNS // for non existing names fWrkFQDN = slave->GetName(); if (strcmp(slave->ClassName(), "TSlaveLite")) { fWrkFQDN = TUrl(fWrkFQDN).GetHostFQDN(); // Get full name for local hosts if (fWrkFQDN.Contains("localhost") || fWrkFQDN == "127.0.0.1") fWrkFQDN = TUrl(gSystem->HostName()).GetHostFQDN(); } PDB(kPacketizer, 2) Info("TSlaveStat", "wrk FQDN: %s", fWrkFQDN.Data()); } //////////////////////////////////////////////////////////////////////////////// /// Cleanup TPacketizerAdaptive::TSlaveStat::~TSlaveStat() { SafeDelete(fDSubSet); SafeDelete(fStatus); } //////////////////////////////////////////////////////////////////////////////// /// Update packetizer rates void TPacketizerAdaptive::TSlaveStat::UpdateRates(TProofProgressStatus *st) { if (!st) { Error("UpdateRates", "no status object!"); return; } if (fCurFile->IsDone()) { fCurProcTime = 0; fCurProcessed = 0; } else { fCurProcTime += st->GetProcTime() - GetProcTime(); fCurProcessed += st->GetEntries() - GetEntriesProcessed(); } fCurFile->GetNode()->IncProcessed(st->GetEntries() - GetEntriesProcessed()); st->SetLastEntries(st->GetEntries() - fStatus->GetEntries()); SafeDelete(fStatus); fStatus = st; } //////////////////////////////////////////////////////////////////////////////// /// Add the current element to the fDSubSet (subset processed by this worker) /// and if the status arg is given, then change the size of the packet. /// return the difference (*st - *fStatus) TProofProgressStatus *TPacketizerAdaptive::TSlaveStat::AddProcessed(TProofProgressStatus *st) { if (st && fDSubSet && fCurElem) { if (fCurElem->GetNum() != st->GetEntries() - GetEntriesProcessed()) fCurElem->SetNum(st->GetEntries() - GetEntriesProcessed()); fDSubSet->Add(fCurElem); TProofProgressStatus *diff = new TProofProgressStatus(*st - *fStatus); return diff; } else { Error("AddProcessed", "processed subset of current elem undefined"); return 0; } } //------------------------------------------------------------------------------ ClassImp(TPacketizerAdaptive); //////////////////////////////////////////////////////////////////////////////// /// Constructor TPacketizerAdaptive::TPacketizerAdaptive(TDSet *dset, TList *slaves, Long64_t first, Long64_t num, TList *input, TProofProgressStatus *st) : TVirtualPacketizer(input, st) { PDB(kPacketizer,1) Info("TPacketizerAdaptive", "enter (first %lld, num %lld)", first, num); // Init pointer members fSlaveStats = 0; fUnAllocated = 0; fActive = 0; fFileNodes = 0; fMaxPerfIdx = 1; fCachePacketSync = kTRUE; fMaxEntriesRatio = 2.; fMaxSlaveCnt = -1; fPacketAsAFraction = 4; fStrategy = 1; fFilesToProcess = new TSortedList; fFilesToProcess->SetOwner(kFALSE); if (!fProgressStatus) { Error("TPacketizerAdaptive", "No progress status"); return; } // Attempt to synchronize the packet size with the tree cache size Int_t cpsync = -1; if (TProof::GetParameter(input, "PROOF_PacketizerCachePacketSync", cpsync) != 0) { // Check if there is a global cache-packet sync setting cpsync = gEnv->GetValue("Packetizer.CachePacketSync", 1); } if (cpsync >= 0) fCachePacketSync = (cpsync > 0) ? kTRUE : kFALSE; // Max file entries to avg allowed ratio for cache-to-packet synchronization // (applies only if fCachePacketSync is true; -1. disables the bound) if (TProof::GetParameter(input, "PROOF_PacketizerMaxEntriesRatio", fMaxEntriesRatio) != 0) { // Check if there is a global ratio setting fMaxEntriesRatio = gEnv->GetValue("Packetizer.MaxEntriesRatio", 2.); } // The possibility to change packetizer strategy to the basic TPacketizer's // one (in which workers always process their local data first). Int_t strategy = -1; if (TProof::GetParameter(input, "PROOF_PacketizerStrategy", strategy) != 0) { // Check if there is a global strategy setting strategy = gEnv->GetValue("Packetizer.Strategy", 1); } if (strategy == 0) { fStrategy = 0; Info("TPacketizerAdaptive", "using the basic strategy of TPacketizer"); } else if (strategy != 1) { Warning("TPacketizerAdaptive", "unsupported strategy index (%d): ignore", strategy); } Long_t maxSlaveCnt = 0; if (TProof::GetParameter(input, "PROOF_MaxSlavesPerNode", maxSlaveCnt) == 0) { if (maxSlaveCnt < 0) { Info("TPacketizerAdaptive", "The value of PROOF_MaxSlavesPerNode must be positive"); maxSlaveCnt = 0; } } else { // Try also with Int_t (recently supported in TProof::SetParameter) Int_t mxslcnt = -1; if (TProof::GetParameter(input, "PROOF_MaxSlavesPerNode", mxslcnt) == 0) { if (mxslcnt < 0) { Info("TPacketizerAdaptive", "The value of PROOF_MaxSlavesPerNode must be positive"); mxslcnt = 0; } maxSlaveCnt = (Long_t) mxslcnt; } } if (!maxSlaveCnt) maxSlaveCnt = gEnv->GetValue("Packetizer.MaxWorkersPerNode", 0); if (maxSlaveCnt > 0) { fMaxSlaveCnt = maxSlaveCnt; Info("TPacketizerAdaptive", "Setting max number of workers per node to %ld", fMaxSlaveCnt); } // if forceLocal parameter is set to 1 then eliminate the cross-worker // processing; // This minimizes the network usage on the PROOF cluser at the expense of // longer jobs processing times. // To process successfully the session must have workers with all the data! fForceLocal = kFALSE; Int_t forceLocal = 0; if (TProof::GetParameter(input, "PROOF_ForceLocal", forceLocal) == 0) { if (forceLocal == 1) fForceLocal = kTRUE; else Info("TPacketizerAdaptive", "The only accepted value of PROOF_ForceLocal parameter is 1 !"); } // Below we provide a possibility to change the way packet size is // calculated or define the packet time directly. // fPacketAsAFraction can be interpreted as follows: // packet time is (expected job proc. time) / fPacketSizeAsAFraction. // It substitutes 20 in the old formula to calculate the fPacketSize: // fPacketSize = fTotalEntries / (20 * nslaves) Int_t packetAsAFraction = 0; if (TProof::GetParameter(input, "PROOF_PacketAsAFraction", packetAsAFraction) == 0) { if (packetAsAFraction > 0) { fPacketAsAFraction = packetAsAFraction; Info("TPacketizerAdaptive", "using alternate fraction of query time as a packet size: %d", packetAsAFraction); } else Info("TPacketizerAdaptive", "packetAsAFraction parameter must be higher than 0"); } // Packet re-assignement fTryReassign = 0; Int_t tryReassign = 0; if (TProof::GetParameter(input, "PROOF_TryReassign", tryReassign) != 0) tryReassign = gEnv->GetValue("Packetizer.TryReassign", 0); fTryReassign = tryReassign; if (fTryReassign != 0) Info("TPacketizerAdaptive", "failed packets will be re-assigned"); // Save the config parameters in the dedicated list so that they will be saved // in the outputlist and therefore in the relevant TQueryResult fConfigParams->Add(new TParameter("PROOF_PacketizerCachePacketSync", (Int_t)fCachePacketSync)); fConfigParams->Add(new TParameter("PROOF_PacketizerMaxEntriesRatio", fMaxEntriesRatio)); fConfigParams->Add(new TParameter("PROOF_PacketizerStrategy", fStrategy)); fConfigParams->Add(new TParameter("PROOF_MaxWorkersPerNode", (Int_t)fMaxSlaveCnt)); fConfigParams->Add(new TParameter("PROOF_ForceLocal", (Int_t)fForceLocal)); fConfigParams->Add(new TParameter("PROOF_PacketAsAFraction", fPacketAsAFraction)); Double_t baseLocalPreference = 1.2; fBaseLocalPreference = (Float_t)baseLocalPreference; if (TProof::GetParameter(input, "PROOF_BaseLocalPreference", baseLocalPreference) == 0) fBaseLocalPreference = (Float_t)baseLocalPreference; fFileNodes = new TList; fFileNodes->SetOwner(); fUnAllocated = new TList; fUnAllocated->SetOwner(kFALSE); fActive = new TList; fActive->SetOwner(kFALSE); fValid = kTRUE; // Resolve end-point urls to optmize distribution // dset->Lookup(); // moved to TProofPlayerRemote::Process // Read list of mounted disks TObjArray *partitions = 0; TString partitionsStr; if (TProof::GetParameter(input, "PROOF_PacketizerPartitions", partitionsStr) != 0) partitionsStr = gEnv->GetValue("Packetizer.Partitions", ""); if (!partitionsStr.IsNull()) { Info("TPacketizerAdaptive", "Partitions: %s", partitionsStr.Data()); partitions = partitionsStr.Tokenize(","); } // Split into per host and disk entries dset->Reset(); TDSetElement *e; while ((e = (TDSetElement*)dset->Next())) { if (e->GetValid()) continue; // The dataset name, if any if (fDataSet.IsNull() && e->GetDataSet() && strlen(e->GetDataSet())) fDataSet = e->GetDataSet(); TUrl url = e->GetFileName(); PDB(kPacketizer,2) Info("TPacketizerAdaptive", "element name: %s (url: %s)", e->GetFileName(), url.GetUrl()); // Map non URL filenames to dummy host TString host; if ( !url.IsValid() || (strncmp(url.GetProtocol(),"root", 4) && strncmp(url.GetProtocol(),"file", 4)) ) { host = "no-host"; } else if ( url.IsValid() && !strncmp(url.GetProtocol(),"file", 4)) { host = "localhost"; url.SetProtocol("root"); } else { host = url.GetHostFQDN(); } // Get full name for local hosts if (host.Contains("localhost") || host == "127.0.0.1") { url.SetHost(gSystem->HostName()); host = url.GetHostFQDN(); } // Find on which disk is the file, if any TString disk; if (partitions) { TIter iString(partitions); TObjString* os = 0; while ((os = (TObjString *)iString())) { // Compare begining of the url with disk mountpoint if (strncmp(url.GetFile(), os->GetName(), os->GetString().Length()) == 0) { disk = os->GetName(); break; } } } // Node's url TString nodeStr; if (disk.IsNull()) nodeStr.Form("%s://%s", url.GetProtocol(), host.Data()); else nodeStr.Form("%s://%s/%s", url.GetProtocol(), host.Data(), disk.Data()); TFileNode *node = (TFileNode *) fFileNodes->FindObject(nodeStr); if (node == 0) { node = new TFileNode(nodeStr, fStrategy, fFilesToProcess); fFileNodes->Add(node); PDB(kPacketizer,2) Info("TPacketizerAdaptive", "creating new node '%s' or the element", nodeStr.Data()); } else { PDB(kPacketizer,2) Info("TPacketizerAdaptive", "adding element to existing node '%s'", nodeStr.Data()); } node->Add(e, kFALSE); } fSlaveStats = new TMap; fSlaveStats->SetOwner(kFALSE); TSlave *slave; TIter si(slaves); while ((slave = (TSlave*) si.Next())) { fSlaveStats->Add( slave, new TSlaveStat(slave) ); fMaxPerfIdx = slave->GetPerfIdx() > fMaxPerfIdx ? slave->GetPerfIdx() : fMaxPerfIdx; } // Setup file & filenode structure Reset(); // Optimize the number of files to be open when running on subsample Int_t validateMode = 0; Int_t gprc = TProof::GetParameter(input, "PROOF_ValidateByFile", validateMode); Bool_t byfile = (gprc == 0 && validateMode > 0 && num > -1) ? kTRUE : kFALSE; if (num > -1) PDB(kPacketizer,2) Info("TPacketizerAdaptive", "processing subset of entries: validating by file? %s", byfile ? "yes": "no"); ValidateFiles(dset, slaves, num, byfile); if (!fValid) return; // apply global range (first,num) to dset and rebuild structure // ommitting TDSet elements that are not needed Int_t files = 0; fTotalEntries = 0; fUnAllocated->Clear(); // avoid dangling pointers fActive->Clear(); fFileNodes->Clear(); // then delete all objects PDB(kPacketizer,2) Info("TPacketizerAdaptive", "processing range: first %lld, num %lld", first, num); dset->Reset(); Long64_t cur = 0; while (( e = (TDSetElement*)dset->Next())) { // Skip invalid or missing file; It will be moved // from the dset to the 'MissingFiles' list in the player. if (!e->GetValid()) continue; TUrl url = e->GetFileName(); Long64_t eFirst = e->GetFirst(); Long64_t eNum = e->GetNum(); PDB(kPacketizer,2) Info("TPacketizerAdaptive", "processing element '%s'", e->GetFileName()); PDB(kPacketizer,2) Info("TPacketizerAdaptive", " --> first %lld, elenum %lld (cur %lld) (entrylist: %p)", eFirst, eNum, cur, e->GetEntryList()); if (!e->GetEntryList()) { // This element is before the start of the global range, skip it if (cur + eNum < first) { cur += eNum; PDB(kPacketizer,2) Info("TPacketizerAdaptive", " --> skip element cur %lld", cur); continue; } // This element is after the end of the global range, skip it if (num != -1 && (first+num <= cur)) { cur += eNum; PDB(kPacketizer,2) Info("TPacketizerAdaptive", " --> drop element cur %lld", cur); continue; // break ?? } Bool_t inRange = kFALSE; if (cur <= first || (num != -1 && (first+num <= cur+eNum))) { if (cur <= first) { // If this element contains the start of the global range // adjust its start and number of entries e->SetFirst( eFirst + (first - cur) ); e->SetNum( e->GetNum() - (first - cur) ); PDB(kPacketizer,2) Info("TPacketizerAdaptive", " --> adjust start %lld and end %lld", eFirst + (first - cur), first + num - cur); inRange = kTRUE; } if (num != -1 && (first+num <= cur+eNum)) { // If this element contains the end of the global range // adjust its number of entries e->SetNum( first + num - e->GetFirst() - cur ); PDB(kPacketizer,2) Info("TPacketizerAdaptive", " --> adjust end %lld", first + num - cur); inRange = kTRUE; } } else { // Increment the counter ... PDB(kPacketizer,2) Info("TPacketizerAdaptive", " --> increment 'cur' by %lld", eNum); cur += eNum; } // Re-adjust eNum and cur, if needed if (inRange) { cur += eNum; eNum = e->GetNum(); } } else { TEntryList *enl = dynamic_cast(e->GetEntryList()); if (enl) { eNum = enl->GetN(); PDB(kPacketizer,2) Info("TPacketizerAdaptive", " --> entry-list element: %lld entries", eNum); } else { TEventList *evl = dynamic_cast(e->GetEntryList()); eNum = evl ? evl->GetN() : eNum; PDB(kPacketizer,2) Info("TPacketizerAdaptive", " --> event-list element: %lld entries (evl:%p)", eNum, evl); } if (!eNum) { PDB(kPacketizer,2) Info("TPacketizerAdaptive", " --> empty entry- or event-list element!"); continue; } } PDB(kPacketizer,2) Info("TPacketizerAdaptive", " --> next cur %lld", cur); // Map non URL filenames to dummy host TString host; if ( !url.IsValid() || (strncmp(url.GetProtocol(),"root", 4) && strncmp(url.GetProtocol(),"file", 4)) ) { host = "no-host"; } else if ( url.IsValid() && !strncmp(url.GetProtocol(),"file", 4)) { host = "localhost"; url.SetProtocol("root"); } else { host = url.GetHostFQDN(); } // Get full name for local hosts if (host.Contains("localhost") || host == "127.0.0.1") { url.SetHost(gSystem->HostName()); host = url.GetHostFQDN(); } // Find, on which disk is the file TString disk; if (partitions) { TIter iString(partitions); TObjString* os = 0; while ((os = (TObjString *)iString())) { // Compare begining of the url with disk mountpoint if (strncmp(url.GetFile(), os->GetName(), os->GetString().Length()) == 0) { disk = os->GetName(); break; } } } // Node's url TString nodeStr; if (disk.IsNull()) nodeStr.Form("%s://%s", url.GetProtocol(), host.Data()); else nodeStr.Form("%s://%s/%s", url.GetProtocol(), host.Data(), disk.Data()); TFileNode *node = (TFileNode*) fFileNodes->FindObject(nodeStr); if (node == 0) { node = new TFileNode(nodeStr, fStrategy, fFilesToProcess); fFileNodes->Add( node ); PDB(kPacketizer, 2) Info("TPacketizerAdaptive", " --> creating new node '%s' for element", nodeStr.Data()); } else { PDB(kPacketizer, 2) Info("TPacketizerAdaptive", " --> adding element to exiting node '%s'", nodeStr.Data()); } ++files; fTotalEntries += eNum; node->Add(e, kTRUE); node->IncEvents(eNum); PDB(kPacketizer,2) e->Print("a"); } PDB(kPacketizer,1) Info("TPacketizerAdaptive", "processing %lld entries in %d files on %d hosts", fTotalEntries, files, fFileNodes->GetSize()); // Set the total number for monitoring if (gPerfStats) gPerfStats->SetNumEvents(fTotalEntries); Reset(); InitStats(); if (!fValid) SafeDelete(fProgress); PDB(kPacketizer,1) Info("TPacketizerAdaptive", "return"); } //////////////////////////////////////////////////////////////////////////////// /// Destructor. TPacketizerAdaptive::~TPacketizerAdaptive() { if (fSlaveStats) { fSlaveStats->DeleteValues(); } SafeDelete(fSlaveStats); SafeDelete(fUnAllocated); SafeDelete(fActive); SafeDelete(fFileNodes); SafeDelete(fFilesToProcess); } //////////////////////////////////////////////////////////////////////////////// /// (re)initialise the statistics /// called at the begining or after a worker dies. void TPacketizerAdaptive::InitStats() { // calculating how many files from TDSet are not cached on // any slave Int_t noRemoteFiles = 0; fNEventsOnRemLoc = 0; Int_t totalNumberOfFiles = 0; TIter next(fFileNodes); while (TFileNode *fn = (TFileNode*)next()) { totalNumberOfFiles += fn->GetNumberOfFiles(); if (fn->GetMySlaveCnt() == 0) { noRemoteFiles += fn->GetNumberOfFiles(); fNEventsOnRemLoc += (fn->GetNEvents() - fn->GetProcessed()); } } if (totalNumberOfFiles == 0) { Info("InitStats", "no valid or non-empty file found: setting invalid"); // No valid files: set invalid and return fValid = kFALSE; return; } fFractionOfRemoteFiles = (1.0 * noRemoteFiles) / totalNumberOfFiles; Info("InitStats", "fraction of remote files %f", fFractionOfRemoteFiles); if (!fValid) SafeDelete(fProgress); PDB(kPacketizer,1) Info("InitStats", "return"); } //////////////////////////////////////////////////////////////////////////////// /// Get next unallocated file from 'node' or other nodes: /// First try 'node'. If there is no more files, keep trying to /// find an unallocated file on other nodes. TPacketizerAdaptive::TFileStat *TPacketizerAdaptive::GetNextUnAlloc(TFileNode *node, const char *nodeHostName) { TFileStat *file = 0; if (node != 0) { PDB(kPacketizer, 2) Info("GetNextUnAlloc", "looking for file on node %s", node->GetName()); file = node->GetNextUnAlloc(); if (file == 0) RemoveUnAllocNode(node); } else { if (nodeHostName && strlen(nodeHostName) > 0) { TFileNode *fn; // Make sure that they are in the corrected order fUnAllocated->Sort(); PDB(kPacketizer,2) fUnAllocated->Print(); // Loop over unallocated fileNode list for (int i = 0; i < fUnAllocated->GetSize(); i++) { if ((fn = (TFileNode *) fUnAllocated->At(i))) { TUrl uu(fn->GetName()); PDB(kPacketizer, 2) Info("GetNextUnAlloc", "comparing %s with %s...", nodeHostName, uu.GetHost()); // Check, whether node's hostname is matching with current fileNode (fn) if (!strcmp(nodeHostName, uu.GetHost())) { node = fn; // Fetch next unallocated file from this node if ((file = node->GetNextUnAlloc()) == 0) { RemoveUnAllocNode(node); node = 0; } else { PDB(kPacketizer, 2) Info("GetNextUnAlloc", "found! (host: %s)", uu.GetHost()); break; } } } else { Warning("GetNextUnAlloc", "unallocate entry %d is empty!", i); } } if (node != 0 && fMaxSlaveCnt > 0 && node->GetExtSlaveCnt() >= fMaxSlaveCnt) { // Unlike in TPacketizer we look at the number of ext slaves only. PDB(kPacketizer,1) Info("GetNextUnAlloc", "reached Workers-per-Node Limit (%ld)", fMaxSlaveCnt); node = 0; } } if (node == 0) { while (file == 0 && ((node = NextNode()) != 0)) { PDB(kPacketizer, 2) Info("GetNextUnAlloc", "looking for file on node %s", node->GetName()); if ((file = node->GetNextUnAlloc()) == 0) RemoveUnAllocNode(node); } } } if (file != 0) { // if needed make node active if (fActive->FindObject(node) == 0) { fActive->Add(node); } } PDB(kPacketizer, 2) { if (!file) { Info("GetNextUnAlloc", "no file found!"); } else { file->Print(); } } return file; } //////////////////////////////////////////////////////////////////////////////// /// Get next node which has unallocated files. /// the order is determined by TFileNode::Compare TPacketizerAdaptive::TFileNode *TPacketizerAdaptive::NextNode() { fUnAllocated->Sort(); PDB(kPacketizer,2) { fUnAllocated->Print(); } TFileNode *fn = (TFileNode*) fUnAllocated->First(); if (fn != 0 && fMaxSlaveCnt > 0 && fn->GetExtSlaveCnt() >= fMaxSlaveCnt) { // unlike in TPacketizer we look at the number of ext slaves only. PDB(kPacketizer,1) Info("NextNode", "reached Workers-per-Node Limit (%ld)", fMaxSlaveCnt); fn = 0; } return fn; } //////////////////////////////////////////////////////////////////////////////// /// Remove unallocated node. void TPacketizerAdaptive::RemoveUnAllocNode(TFileNode * node) { fUnAllocated->Remove(node); } //////////////////////////////////////////////////////////////////////////////// /// Get next active file. TPacketizerAdaptive::TFileStat *TPacketizerAdaptive::GetNextActive() { TFileNode *node; TFileStat *file = 0; while (file == 0 && ((node = NextActiveNode()) != 0)) { file = node->GetNextActive(); if (file == 0) RemoveActiveNode(node); } return file; } //////////////////////////////////////////////////////////////////////////////// /// Get next active node. TPacketizerAdaptive::TFileNode *TPacketizerAdaptive::NextActiveNode() { fActive->Sort(); PDB(kPacketizer,2) { Info("NextActiveNode", "enter"); fActive->Print(); } TFileNode *fn = (TFileNode*) fActive->First(); // look at only ext slaves if (fn != 0 && fMaxSlaveCnt > 0 && fn->GetExtSlaveCnt() >= fMaxSlaveCnt) { PDB(kPacketizer,1) Info("NextActiveNode","reached Workers-per-Node limit (%ld)", fMaxSlaveCnt); fn = 0; } return fn; } //////////////////////////////////////////////////////////////////////////////// /// Remove file from the list of actives. void TPacketizerAdaptive::RemoveActive(TFileStat *file) { TFileNode *node = file->GetNode(); node->RemoveActive(file); if (node->GetNumberOfActiveFiles() == 0) RemoveActiveNode(node); } //////////////////////////////////////////////////////////////////////////////// /// Remove node from the list of actives. void TPacketizerAdaptive::RemoveActiveNode(TFileNode *node) { fActive->Remove(node); } //////////////////////////////////////////////////////////////////////////////// /// Reset the internal data structure for packet distribution. void TPacketizerAdaptive::Reset() { fUnAllocated->Clear(); fUnAllocated->AddAll(fFileNodes); fActive->Clear(); TIter files(fFileNodes); TFileNode *fn; while ((fn = (TFileNode*) files.Next()) != 0) { fn->Reset(); } TIter slaves(fSlaveStats); TObject *key; while ((key = slaves.Next()) != 0) { TSlaveStat *slstat = (TSlaveStat*) fSlaveStats->GetValue(key); if (!slstat) { Warning("Reset", "TSlaveStat associated to key '%s' is NULL", key->GetName()); continue; } // Find out which file nodes are on the worker machine and assign the // one with less workers assigned TFileNode *fnmin = 0; Int_t fncnt = fSlaveStats->GetSize(); files.Reset(); while ((fn = (TFileNode*) files.Next()) != 0) { if (!strcmp(slstat->GetName(), TUrl(fn->GetName()).GetHost())) { if (fn->GetMySlaveCnt() < fncnt) { fnmin = fn; fncnt = fn->GetMySlaveCnt(); } } } if (fnmin != 0 ) { slstat->SetFileNode(fnmin); fnmin->IncMySlaveCnt(); PDB(kPacketizer, 2) Info("Reset","assigning node '%s' to '%s' (cnt: %d)", fnmin->GetName(), slstat->GetName(), fnmin->GetMySlaveCnt()); } slstat->fCurFile = 0; } } //////////////////////////////////////////////////////////////////////////////// /// Check existence of file/dir/tree an get number of entries. /// Assumes the files have been setup. void TPacketizerAdaptive::ValidateFiles(TDSet *dset, TList *slaves, Long64_t maxent, Bool_t byfile) { TMap slaves_by_sock; TMonitor mon; TList workers; // Setup the communication infrastructure workers.AddAll(slaves); TIter si(slaves); TSlave *slm; while ((slm = (TSlave*)si.Next()) != 0) { PDB(kPacketizer,3) Info("ValidateFiles","socket added to monitor: %p (%s)", slm->GetSocket(), slm->GetName()); mon.Add(slm->GetSocket()); slaves_by_sock.Add(slm->GetSocket(), slm); } mon.DeActivateAll(); ((TProof*)gProof)->DeActivateAsyncInput(); // Some monitoring systems (TXSocketHandler) need to know this ((TProof*)gProof)->fCurrentMonitor = &mon; // Identify the type if (!strcmp(dset->GetType(), "TTree")) SetBit(TVirtualPacketizer::kIsTree); // Preparing for client notification TString msg("Validating files"); UInt_t n = 0; UInt_t tot = dset->GetListOfElements()->GetSize(); Bool_t st = kTRUE; Long64_t totent = 0, nopenf = 0; while (kTRUE) { // send work while (TSlave *s = (TSlave *)workers.First()) { workers.Remove(s); // find a file TSlaveStat *slstat = (TSlaveStat*)fSlaveStats->GetValue(s); if (!slstat) { Error("ValidateFiles", "TSlaveStat associated to slave '%s' is NULL", s->GetName()); continue; } TFileNode *node = 0; TFileStat *file = 0; // try its own node first if ((node = slstat->GetFileNode()) != 0) { PDB(kPacketizer,3) node->Print(); file = GetNextUnAlloc(node); if (file == 0) slstat->SetFileNode(0); } // look for a file on any other node if necessary if (file == 0) file = GetNextUnAlloc(); if (file != 0) { // files are done right away RemoveActive(file); slstat->fCurFile = file; TDSetElement *elem = file->GetElement(); Long64_t entries = elem->GetEntries(kTRUE, kFALSE); if (entries < 0 || strlen(elem->GetTitle()) <= 0) { // This is decremented when we get the reply file->GetNode()->IncExtSlaveCnt(slstat->GetName()); TMessage m(kPROOF_GETENTRIES); m << dset->IsTree() << TString(elem->GetFileName()) << TString(elem->GetDirectory()) << TString(elem->GetObjName()); s->GetSocket()->Send( m ); mon.Activate(s->GetSocket()); PDB(kPacketizer,2) Info("ValidateFiles", "sent to worker-%s (%s) via %p GETENTRIES on %s %s %s %s", s->GetOrdinal(), s->GetName(), s->GetSocket(), dset->IsTree() ? "tree" : "objects", elem->GetFileName(), elem->GetDirectory(), elem->GetObjName()); } else { // Fill the info elem->SetTDSetOffset(entries); if (entries > 0) { // Most likely valid elem->SetValid(); if (!elem->GetEntryList()) { if (elem->GetFirst() > entries) { Error("ValidateFiles", "first (%lld) higher then number of entries (%lld) in %s", elem->GetFirst(), entries, elem->GetFileName()); // disable element slstat->fCurFile->SetDone(); elem->Invalidate(); dset->SetBit(TDSet::kSomeInvalid); } if (elem->GetNum() == -1) { elem->SetNum(entries - elem->GetFirst()); } else if (elem->GetFirst() + elem->GetNum() > entries) { Warning("ValidateFiles", "num (%lld) + first (%lld) larger then number of" " keys/entries (%lld) in %s", elem->GetNum(), elem->GetFirst(), entries, elem->GetFileName()); elem->SetNum(entries - elem->GetFirst()); } PDB(kPacketizer,2) Info("ValidateFiles", "found elem '%s' with %lld entries", elem->GetFileName(), entries); } } // Count totent += entries; nopenf++; // Notify the client n++; gProof->SendDataSetStatus(msg, n, tot, st); // This worker is ready for the next validation workers.Add(s); } } } // Check if there is anything to wait for if (mon.GetActive() == 0) { if (byfile && maxent > 0) { // How many files do we still need ? Long64_t nrestf = (maxent - totent) * nopenf / totent ; if (nrestf <= 0 && maxent > totent) nrestf = 1; if (nrestf > 0) { PDB(kPacketizer,3) Info("ValidateFiles", "{%lld, %lld, %lld}: needs to validate %lld more files", maxent, totent, nopenf, nrestf); si.Reset(); while ((slm = (TSlave *) si.Next()) && nrestf--) { workers.Add(slm); } continue; } else { PDB(kPacketizer,3) Info("ValidateFiles", "no need to validate more files"); break; } } else { break; } } PDB(kPacketizer,3) { Info("ValidateFiles", "waiting for %d slaves:", mon.GetActive()); TList *act = mon.GetListOfActives(); TIter next(act); while (TSocket *s = (TSocket*) next()) { TSlave *sl = (TSlave *) slaves_by_sock.GetValue(s); if (sl) Info("ValidateFiles", " worker-%s (%s)", sl->GetOrdinal(), sl->GetName()); } delete act; } TSocket *sock = mon.Select(); // If we have been interrupted break if (!sock) { Error("ValidateFiles", "selection has been interrupted - STOP"); mon.DeActivateAll(); fValid = kFALSE; break; } mon.DeActivate(sock); PDB(kPacketizer,3) Info("ValidateFiles", "select returned: %p", sock); TSlave *slave = (TSlave *) slaves_by_sock.GetValue( sock ); if (!sock->IsValid()) { // A socket got invalid during validation Error("ValidateFiles", "worker-%s (%s) got invalid - STOP", slave->GetOrdinal(), slave->GetName()); ((TProof*)gProof)->MarkBad(slave, "socket got invalid during validation"); fValid = kFALSE; break; } TMessage *reply; if (sock->Recv(reply) <= 0) { // Notify Error("ValidateFiles", "Recv failed! for worker-%s (%s)", slave->GetOrdinal(), slave->GetName()); // Help! lost a slave? ('slave' is deleted inside here ...) ((TProof*)gProof)->MarkBad(slave, "receive failed during validation"); fValid = kFALSE; continue; } if (reply->What() != kPROOF_GETENTRIES) { // Not what we want: handover processing to the central machinery Int_t what = reply->What(); ((TProof*)gProof)->HandleInputMessage(slave, reply); if (what == kPROOF_FATAL) { Error("ValidateFiles", "kPROOF_FATAL from worker-%s (%s)", slave->GetOrdinal(), slave->GetName()); fValid = kFALSE; } else { // Reactivate the socket mon.Activate(sock); } // Get next message continue; } TSlaveStat *slavestat = (TSlaveStat*) fSlaveStats->GetValue( slave ); TDSetElement *e = slavestat->fCurFile->GetElement(); slavestat->fCurFile->GetNode()->DecExtSlaveCnt(slavestat->GetName()); Long64_t entries; (*reply) >> entries; // Extract object name, if there if ((reply->BufferSize() > reply->Length())) { TString objname; (*reply) >> objname; e->SetTitle(objname); } e->SetTDSetOffset(entries); if (entries > 0) { // This dataset element is most likely valid e->SetValid(); if (!e->GetEntryList()) { if (e->GetFirst() > entries) { Error("ValidateFiles", "first (%lld) higher then number of entries (%lld) in %s", e->GetFirst(), entries, e->GetFileName()); // Invalidate the element slavestat->fCurFile->SetDone(); e->Invalidate(); dset->SetBit(TDSet::kSomeInvalid); } if (e->GetNum() == -1) { e->SetNum(entries - e->GetFirst()); } else if (e->GetFirst() + e->GetNum() > entries) { Error("ValidateFiles", "num (%lld) + first (%lld) larger then number of keys/entries (%lld) in %s", e->GetNum(), e->GetFirst(), entries, e->GetFileName()); e->SetNum(entries - e->GetFirst()); } } // Count totent += entries; nopenf++; // Notify the client n++; gProof->SendDataSetStatus(msg, n, tot, st); } else { Error("ValidateFiles", "cannot get entries for file: %s - skipping", e->GetFileName() ); // // Need to fix this with a user option to allow incomplete file sets (rdm) // //fValid = kFALSE; // all element must be readable! if (gProofServ) { TMessage m(kPROOF_MESSAGE); m << TString(Form("Cannot get entries for file: %s - skipping", e->GetFileName())); gProofServ->GetSocket()->Send(m); } // invalidate element e->Invalidate(); dset->SetBit(TDSet::kSomeInvalid); } PDB(kPacketizer,3) Info("ValidateFiles", " %lld events validated", totent); // Ready for the next job, unless we have enough files if (maxent < 0 || ((totent < maxent) && !byfile)) workers.Add(slave); } // report std. output from slaves?? ((TProof*)gProof)->ActivateAsyncInput(); // This needs to be reset ((TProof*)gProof)->fCurrentMonitor = 0; // No reason to continue if invalid if (!fValid) return; // compute the offset for each file element Long64_t offset = 0; Long64_t newOffset = 0; TIter next(dset->GetListOfElements()); TDSetElement *el; while ( (el = dynamic_cast (next())) ) { if (el->GetValid()) { newOffset = offset + el->GetTDSetOffset(); el->SetTDSetOffset(offset); offset = newOffset; } } } //////////////////////////////////////////////////////////////////////////////// /// The result depends on the fStrategy Int_t TPacketizerAdaptive::CalculatePacketSize(TObject *slStatPtr, Long64_t cachesz, Int_t learnent) { Long64_t num; if (fStrategy == 0) { // TPacketizer's heuristic for starting packet size // Constant packet size; Int_t nslaves = fSlaveStats->GetSize(); if (nslaves > 0) { num = fTotalEntries / (fPacketAsAFraction * nslaves); } else { num = 1; } } else { // The dynamic heuristic for setting the packet size (default) // Calculates the packet size based on performance of this slave // and estimated time left until the end of the query. TSlaveStat* slstat = (TSlaveStat*)slStatPtr; Float_t rate = slstat->GetCurRate(); if (!rate) rate = slstat->GetAvgRate(); if (rate) { // Global average rate Float_t avgProcRate = (GetEntriesProcessed()/(GetCumProcTime() / fSlaveStats->GetSize())); Float_t packetTime = ((fTotalEntries - GetEntriesProcessed())/avgProcRate)/fPacketAsAFraction; // Bytes-to-Event conversion Float_t bevt = (GetEntriesProcessed() > 0) ? GetBytesRead() / GetEntriesProcessed() : -1.; // Make sure it is not smaller then the cache, if the info is available and the size // synchronization is required. But apply the cache-packet size synchronization only if there // are enough left files to process and the files are all of similar sizes. Otherwise we risk // to not exploit optimally all potentially active workers. Bool_t cpsync = fCachePacketSync; if (fMaxEntriesRatio > 0. && cpsync) { if (fFilesToProcess && fFilesToProcess->GetSize() <= fSlaveStats->GetSize()) { Long64_t remEntries = fTotalEntries - GetEntriesProcessed(); Long64_t maxEntries = -1; if (fFilesToProcess->Last()) { TDSetElement *elem = (TDSetElement *) ((TPacketizerAdaptive::TFileStat *) fFilesToProcess->Last())->GetElement(); if (elem) maxEntries = elem->GetNum(); } if (maxEntries > remEntries / fSlaveStats->GetSize() * fMaxEntriesRatio) { PDB(kPacketizer,3) { Info("CalculatePacketSize", "%s: switching off synchronization of packet and cache sizes:", slstat->GetOrdinal()); Info("CalculatePacketSize", "%s: few files (%d) remaining of very different sizes (max/avg = %.2f > %.2f)", slstat->GetOrdinal(), fFilesToProcess->GetSize(), (Double_t)maxEntries / remEntries * fSlaveStats->GetSize(), fMaxEntriesRatio); } cpsync = kFALSE; } } } if (bevt > 0. && cachesz > 0 && cpsync) { if ((Long64_t)(rate * packetTime * bevt) < cachesz) packetTime = cachesz / bevt / rate; } // Apply min-max again, if required if (fMaxPacketTime > 0. && packetTime > fMaxPacketTime) packetTime = fMaxPacketTime; if (fMinPacketTime > 0. && packetTime < fMinPacketTime) packetTime = fMinPacketTime; // Translate the packet length in number of entries num = (Long64_t)(rate * packetTime); // Notify PDB(kPacketizer,2) Info("CalculatePacketSize","%s: avgr: %f, rate: %f, left: %lld, pacT: %f, sz: %f (csz: %f), num: %lld", slstat->GetOrdinal(), avgProcRate, rate, fTotalEntries - GetEntriesProcessed(), packetTime, ((bevt > 0) ? num*bevt/1048576. : -1.), cachesz/1048576., num); } else { // First packet for this worker in this query // Twice the learning phase num = (learnent > 0) ? 5 * learnent : 1000; // Notify PDB(kPacketizer,2) Info("CalculatePacketSize","%s: num: %lld", slstat->GetOrdinal(), num); } } if (num < 1) num = 1; return num; } //////////////////////////////////////////////////////////////////////////////// /// To be used by GetNextPacket but also in reaction to kPROOF_STOPPROCESS /// message (when the worker was asked to stop processing during a packet). /// returns the #entries intended in the last packet - #processed entries Int_t TPacketizerAdaptive::AddProcessed(TSlave *sl, TProofProgressStatus *status, Double_t latency, TList **listOfMissingFiles) { // find slave TSlaveStat *slstat = (TSlaveStat*) fSlaveStats->GetValue( sl ); if (!slstat) { Error("AddProcessed", "%s: TSlaveStat instance for worker %s not found!", (sl ? sl->GetOrdinal() : "x.x"), (sl ? sl->GetName() : "**undef**")); return -1; } // update stats & free old element if ( slstat->fCurElem != 0 ) { Long64_t expectedNumEv = slstat->fCurElem->GetNum(); // Calculate the number of events processed in the last packet Long64_t numev; if (status && status->GetEntries() > 0) numev = status->GetEntries() - slstat->GetEntriesProcessed(); else numev = 0; // Calculate the progress made in the last packet TProofProgressStatus *progress = 0; if (numev > 0) { // This also moves the pointer in the corrsponding TFileInfo progress = slstat->AddProcessed(status); if (progress) { (*fProgressStatus) += *progress; // update processing rate slstat->UpdateRates(status); } } else { progress = new TProofProgressStatus(); } if (progress) { PDB(kPacketizer,2) Info("AddProcessed", "%s: %s: %lld %7.3lf %7.3lf %7.3lf %lld", sl->GetOrdinal(), sl->GetName(), progress->GetEntries(), latency, progress->GetProcTime(), progress->GetCPUTime(), progress->GetBytesRead()); if (gPerfStats) gPerfStats->PacketEvent(sl->GetOrdinal(), sl->GetName(), slstat->fCurElem->GetFileName(), progress->GetEntries(), latency, progress->GetProcTime(), progress->GetCPUTime(), progress->GetBytesRead()); delete progress; } if (numev != expectedNumEv) { // The last packet was not fully processed // and will be split in two: // - The completed part was marked as done. // - Create a new packet with the part to be resubmitted. TDSetElement *newPacket = new TDSetElement(*(slstat->fCurElem)); if (newPacket && numev < expectedNumEv) { Long64_t first = newPacket->GetFirst(); newPacket->SetFirst(first + numev); if (ReassignPacket(newPacket, listOfMissingFiles) == -1) SafeDelete(newPacket); } else Error("AddProcessed", "%s: processed too much? (%lld, %lld)", sl->GetOrdinal(), numev, expectedNumEv); // TODO: a signal handler which will send info from the worker // after a packet fails. /* Add it to the failed packets list. if (!fFailedPackets) { fFailedPackets = new TList(); } fFailedPackets->Add(slstat->fCurElem); */ } slstat->fCurElem = 0; return (expectedNumEv - numev); } else { // the kPROOF_STOPPRPOCESS message is send after the worker receives zero // as the reply to kPROOF_GETNEXTPACKET return -1; } } //////////////////////////////////////////////////////////////////////////////// /// Get next packet; /// A meaningfull difference to TPacketizer is the fact that this /// packetizer, for each worker, tries to predict whether the worker /// will finish processing it's local files before the end of the query. /// If yes, it allocates, to those workers, files from non-slave filenodes /// or from slaves that are overloaded. The check is done every time a new /// file needs to be assigned. TDSetElement *TPacketizerAdaptive::GetNextPacket(TSlave *sl, TMessage *r) { if ( !fValid ) { return 0; } // find slave TSlaveStat *slstat = (TSlaveStat*) fSlaveStats->GetValue( sl ); if (!slstat) { Error("GetNextPacket", "TSlaveStat instance for worker %s not found!", (sl ? sl->GetName() : "**undef**")); return 0; } // Attach to current file TFileStat *file = slstat->fCurFile; // Update stats & free old element Bool_t firstPacket = kFALSE; Long64_t cachesz = -1; Int_t learnent = -1; if ( slstat->fCurElem != 0 ) { Long64_t restEntries = 0; Double_t latency, proctime, proccpu; TProofProgressStatus *status = 0; Bool_t fileNotOpen = kFALSE, fileCorrupted = kFALSE; if (sl->GetProtocol() > 18) { (*r) >> latency; (*r) >> status; if (sl->GetProtocol() > 25) { (*r) >> cachesz >> learnent; if (r->BufferSize() > r->Length()) (*r) >> restEntries; } fileNotOpen = status->TestBit(TProofProgressStatus::kFileNotOpen) ? kTRUE : kFALSE; fileCorrupted = status->TestBit(TProofProgressStatus::kFileCorrupted) ? kTRUE : kFALSE; } else { Long64_t bytesRead = -1; (*r) >> latency >> proctime >> proccpu; // only read new info if available if (r->BufferSize() > r->Length()) (*r) >> bytesRead; if (r->BufferSize() > r->Length()) (*r) >> restEntries; Long64_t totev = 0; if (r->BufferSize() > r->Length()) (*r) >> totev; status = new TProofProgressStatus(totev, bytesRead, -1, proctime, proccpu); fileNotOpen = (restEntries < 0) ? kTRUE : kFALSE; } if (!fileNotOpen && !fileCorrupted) { if (AddProcessed(sl, status, latency) != 0) Error("GetNextPacket", "%s: the worker processed a different # of entries", sl->GetOrdinal()); if (fProgressStatus->GetEntries() >= fTotalEntries) { if (fProgressStatus->GetEntries() > fTotalEntries) Error("GetNextPacket", "%s: processed too many entries! (%lld, %lld)", sl->GetOrdinal(), fProgressStatus->GetEntries(), fTotalEntries); // Send last timer message and stop the timer HandleTimer(0); SafeDelete(fProgress); } } else { if (file) { if (file->GetElement()) { if (fileCorrupted) { Info("GetNextPacket", "%s: file '%s' turned corrupted: invalidating file (%lld)", sl->GetOrdinal(), file->GetElement()->GetName(), restEntries); Int_t nunproc = AddProcessed(sl, status, latency); PDB(kPacketizer,1) Info("GetNextPacket", "%s: %d entries un-processed", sl->GetOrdinal(), nunproc); // Remaining to be processed Long64_t num = 0; if (file->GetElement()->TestBit(TDSetElement::kCorrupted)) { // Add the remainign entries in the packet to the ones already registered num = file->GetElement()->GetEntries() + restEntries; } else { // First call: add the remaining entries in the packet and those of the file // not yet assigned Long64_t rest = file->GetElement()->GetEntries() - file->GetNextEntry(); num = restEntries + rest; } file->GetElement()->SetEntries(num); PDB(kPacketizer,1) Info("GetNextPacket", "%s: removed file: %s, entries left: %lld", sl->GetOrdinal(), file->GetElement()->GetName(), file->GetElement()->GetEntries()); // Flag as corrupted file->GetElement()->SetBit(TDSetElement::kCorrupted); } else { Info("GetNextPacket", "%s: file '%s' could not be open: invalidating related element", sl->GetOrdinal(), file->GetElement()->GetName()); } // Invalidate the element file->GetElement()->Invalidate(); // Add it to the failed packets list if (!fFailedPackets) fFailedPackets = new TList(); if (!fFailedPackets->FindObject(file->GetElement())) fFailedPackets->Add(file->GetElement()); } // Deactivate this TFileStat file->SetDone(); RemoveActive(file); } else { Info("GetNextPacket", "%s: error raised by worker, but TFileStat object invalid:" " protocol error?", sl->GetOrdinal()); } } } else { firstPacket = kTRUE; } if ( fStop ) { HandleTimer(0); return 0; } TString nodeName; if (file != 0) nodeName = file->GetNode()->GetName(); TString nodeHostName(slstat->GetName()); PDB(kPacketizer,3) Info("GetNextPacket", "%s: entries processed: %lld - looking for a packet from node '%s'", sl->GetOrdinal(), fProgressStatus->GetEntries(), nodeName.Data()); // If current file is just finished if ( file != 0 && file->IsDone() ) { file->GetNode()->DecExtSlaveCnt(slstat->GetName()); file->GetNode()->DecRunSlaveCnt(); if (gPerfStats) gPerfStats->FileEvent(sl->GetOrdinal(), sl->GetName(), file->GetNode()->GetName(), file->GetElement()->GetFileName(), kFALSE); file = 0; } // Reset the current file field slstat->fCurFile = file; Long64_t avgEventsLeftPerSlave = (fTotalEntries - fProgressStatus->GetEntries()) / fSlaveStats->GetSize(); if (fTotalEntries == fProgressStatus->GetEntries()) return 0; // Get a file if needed if ( file == 0) { // Needs a new file Bool_t openLocal; // Aiming for localPreference == 1 when #local == #remote events left Float_t localPreference = fBaseLocalPreference - (fNEventsOnRemLoc / (0.4 *(fTotalEntries - fProgressStatus->GetEntries()))); if ( slstat->GetFileNode() != 0 ) { // Local file node exists and has more events to process. fUnAllocated->Sort(); TFileNode* firstNonLocalNode = (TFileNode*)fUnAllocated->First(); Bool_t nonLocalNodePossible; if (fForceLocal) nonLocalNodePossible = 0; else nonLocalNodePossible = firstNonLocalNode ? (fMaxSlaveCnt < 0 || (fMaxSlaveCnt > 0 && firstNonLocalNode->GetExtSlaveCnt() < fMaxSlaveCnt)) : 0; openLocal = !nonLocalNodePossible; Float_t slaveRate = slstat->GetAvgRate(); if ( nonLocalNodePossible && fStrategy == 1) { // OpenLocal is set to kFALSE if ( slstat->GetFileNode()->GetRunSlaveCnt() > slstat->GetFileNode()->GetMySlaveCnt() - 1 ) // External slaves help slstat -> don't open nonlocal files // -1 because, at this point slstat is not running openLocal = kTRUE; else if ( slaveRate == 0 ) { // first file for this slave // GetLocalEventsLeft() counts the potential slave // as running on its fileNode. if ( slstat->GetLocalEventsLeft() * localPreference > (avgEventsLeftPerSlave)) openLocal = kTRUE; else if ( (firstNonLocalNode->GetEventsLeftPerSlave()) < slstat->GetLocalEventsLeft() * localPreference ) openLocal = kTRUE; else if ( firstNonLocalNode->GetExtSlaveCnt() > 1 ) openLocal = kTRUE; else if ( firstNonLocalNode->GetRunSlaveCnt() == 0 ) openLocal = kTRUE; } else { // At this point slstat has a non zero avg rate > 0 Float_t slaveTime = slstat->GetLocalEventsLeft()/slaveRate; // And thus fCumProcTime, fProcessed > 0 Float_t avgTime = avgEventsLeftPerSlave /(fProgressStatus->GetEntries()/GetCumProcTime()); if (slaveTime * localPreference > avgTime) openLocal = kTRUE; else if ((firstNonLocalNode->GetEventsLeftPerSlave()) < slstat->GetLocalEventsLeft() * localPreference) openLocal = kTRUE; } } if (openLocal || fStrategy == 0) { // Try its own node file = slstat->GetFileNode()->GetNextUnAlloc(); if (!file) file = slstat->GetFileNode()->GetNextActive(); if ( file == 0 ) { // No more files on this worker slstat->SetFileNode(0); } } } // Try to find an unused filenode first if(file == 0 && !fForceLocal) file = GetNextUnAlloc(0, nodeHostName); // Then look at the active filenodes if(file == 0 && !fForceLocal) file = GetNextActive(); if (file == 0) return 0; PDB(kPacketizer,3) if (fFilesToProcess) fFilesToProcess->Print(); slstat->fCurFile = file; // if remote and unallocated file if (file->GetNode()->GetMySlaveCnt() == 0 && file->GetElement()->GetFirst() == file->GetNextEntry()) { fNEventsOnRemLoc -= file->GetElement()->GetNum(); if (fNEventsOnRemLoc < 0) { Error("GetNextPacket", "inconsistent value for fNEventsOnRemLoc (%lld): stop delivering packets!", fNEventsOnRemLoc); return 0; } } file->GetNode()->IncExtSlaveCnt(slstat->GetName()); file->GetNode()->IncRunSlaveCnt(); if (gPerfStats) gPerfStats->FileEvent(sl->GetOrdinal(), sl->GetName(), file->GetNode()->GetName(), file->GetElement()->GetFileName(), kTRUE); } Long64_t num = CalculatePacketSize(slstat, cachesz, learnent); // Get a packet TDSetElement *base = file->GetElement(); Long64_t first = file->GetNextEntry(); Long64_t last = base->GetFirst() + base->GetNum(); // If the remaining part is smaller than the (packetsize * 1.5) // then increase the packetsize if ( first + num * 1.5 >= last ) { num = last - first; file->SetDone(); // done // Delete file from active list (unalloc list is single pass, no delete needed) RemoveActive(file); } // Update NextEntry in the file object file->MoveNextEntry(num); slstat->fCurElem = CreateNewPacket(base, first, num); if (base->GetEntryList()) slstat->fCurElem->SetEntryList(base->GetEntryList(), first, num); // Flag the first packet of a new run (dataset) if (firstPacket) slstat->fCurElem->SetBit(TDSetElement::kNewRun); else slstat->fCurElem->ResetBit(TDSetElement::kNewRun); PDB(kPacketizer,2) Info("GetNextPacket","%s: %s %lld %lld (%lld)", sl->GetOrdinal(), base->GetFileName(), first, first + num - 1, num); return slstat->fCurElem; } //////////////////////////////////////////////////////////////////////////////// /// Return the number of workers still processing Int_t TPacketizerAdaptive::GetActiveWorkers() { Int_t actw = 0; TIter nxw(fSlaveStats); TObject *key; while ((key = nxw())) { TSlaveStat *wrkstat = (TSlaveStat *) fSlaveStats->GetValue(key); if (wrkstat && wrkstat->fCurFile) actw++; } // Done return actw; } //////////////////////////////////////////////////////////////////////////////// /// Get Estimation of the current rate; just summing the current rates of /// the active workers Float_t TPacketizerAdaptive::GetCurrentRate(Bool_t &all) { all = kTRUE; // Loop over the workers Float_t currate = 0.; if (fSlaveStats && fSlaveStats->GetSize() > 0) { TIter nxw(fSlaveStats); TObject *key; while ((key = nxw()) != 0) { TSlaveStat *slstat = (TSlaveStat *) fSlaveStats->GetValue(key); if (slstat && slstat->GetProgressStatus() && slstat->GetEntriesProcessed() > 0) { // Sum-up the current rates currate += slstat->GetProgressStatus()->GetCurrentRate(); } else { all = kFALSE; } } } // Done return currate; } //////////////////////////////////////////////////////////////////////////////// /// Get estimation for the number of processed entries and bytes read at time t, /// based on the numbers already processed and the latests worker measured speeds. /// If t <= 0 the current time is used. /// Only the estimation for the entries is currently implemented. /// This is needed to smooth the instantaneous rate plot. Int_t TPacketizerAdaptive::GetEstEntriesProcessed(Float_t t, Long64_t &ent, Long64_t &bytes, Long64_t &calls) { // Default value ent = GetEntriesProcessed(); bytes = GetBytesRead(); calls = GetReadCalls(); // Parse option if (fUseEstOpt == kEstOff) // Do not use estimation return 0; Bool_t current = (fUseEstOpt == kEstCurrent) ? kTRUE : kFALSE; TTime tnow = gSystem->Now(); Double_t now = (t > 0) ? (Double_t)t : Long64_t(tnow) / (Double_t)1000.; Double_t dt = -1; // Loop over the workers Bool_t all = kTRUE; Float_t trate = 0.; if (fSlaveStats && fSlaveStats->GetSize() > 0) { ent = 0; TIter nxw(fSlaveStats); TObject *key; while ((key = nxw()) != 0) { TSlaveStat *slstat = (TSlaveStat *) fSlaveStats->GetValue(key); if (slstat) { // Those surely processed Long64_t e = slstat->GetEntriesProcessed(); if (e <= 0) all = kFALSE; // Time elapsed since last update dt = now - slstat->GetProgressStatus()->GetLastUpdate(); // Add estimated entries processed since last update Float_t rate = (current && slstat->GetCurRate() > 0) ? slstat->GetCurRate() : slstat->GetAvgRate(); trate += rate; // Add estimated entries processed since last update e += (Long64_t) (dt * rate); // Add to the total ent += e; // Notify PDB(kPacketizer,3) Info("GetEstEntriesProcessed","%s: e:%lld rate:%f dt:%f e:%lld", slstat->fSlave->GetOrdinal(), slstat->GetEntriesProcessed(), rate, dt, e); } } } // Notify dt = now - fProgressStatus->GetLastUpdate(); PDB(kPacketizer,2) Info("GetEstEntriesProcessed", "dt: %f, estimated entries: %lld (%lld), bytes read: %lld rate: %f (all: %d)", dt, ent, GetEntriesProcessed(), bytes, trate, all); // Check values ent = (ent > 0) ? ent : fProgressStatus->GetEntries(); ent = (ent <= fTotalEntries) ? ent : fTotalEntries; bytes = (bytes > 0) ? bytes : fProgressStatus->GetBytesRead(); // Done return ((all) ? 0 : 1); } //////////////////////////////////////////////////////////////////////////////// /// This method can be called at any time during processing /// as an effect of handling kPROOF_STOPPROCESS /// If the output list from this worker is going to be sent back to the master, /// the 'status' includes the number of entries processed by the slave. /// From this we calculate the remaining part of the packet. /// 0 indicates that the results from that worker were lost completely. /// Assume that the filenodes for which we have a TFileNode object /// are still up and running. void TPacketizerAdaptive::MarkBad(TSlave *s, TProofProgressStatus *status, TList **listOfMissingFiles) { TSlaveStat *slaveStat = (TSlaveStat *)(fSlaveStats->GetValue(s)); if (!slaveStat) { Error("MarkBad", "Worker does not exist"); return; } // Update worker counters if (slaveStat->fCurFile && slaveStat->fCurFile->GetNode()) { slaveStat->fCurFile->GetNode()->DecExtSlaveCnt(slaveStat->GetName()); slaveStat->fCurFile->GetNode()->DecRunSlaveCnt(); } // If status is defined, the remaining part of the last packet is // reassigned in AddProcessed called from handling kPROOF_STOPPROCESS if (!status) { // Get the subset processed by the bad worker. TList *subSet = slaveStat->GetProcessedSubSet(); if (subSet) { // Take care of the current packet if (slaveStat->fCurElem) { subSet->Add(slaveStat->fCurElem); } // Merge overlapping or subsequent elements Int_t nmg = 0, ntries = 100; TDSetElement *e = 0, *enxt = 0; do { nmg = 0; e = (TDSetElement *) subSet->First(); while ((enxt = (TDSetElement *) subSet->After(e))) { if (e->MergeElement(enxt) >= 0) { nmg++; subSet->Remove(enxt); delete enxt; } else { e = enxt; } } } while (nmg > 0 && --ntries > 0); // reassign the packets assigned to the bad slave and save the size; SplitPerHost(subSet, listOfMissingFiles); // the elements were reassigned so should not be deleted subSet->SetOwner(0); } else { Warning("MarkBad", "subset processed by bad worker not found!"); } (*fProgressStatus) -= *(slaveStat->GetProgressStatus()); } // remove slavestat from the map fSlaveStats->Remove(s); delete slaveStat; // recalculate fNEventsOnRemLoc and others InitStats(); } //////////////////////////////////////////////////////////////////////////////// /// The file in the listOfMissingFiles can appear several times; /// in order to fix that, a TDSetElement::Merge method is needed. Int_t TPacketizerAdaptive::ReassignPacket(TDSetElement *e, TList **listOfMissingFiles) { if (!e) { Error("ReassignPacket", "empty packet!"); return -1; } // Check the old filenode TUrl url = e->GetFileName(); // Check the host from which 'e' was previously read. // Map non URL filenames to dummy host TString host; if (!url.IsValid() || strncmp(url.GetProtocol(),"root", 4)) { host = "no-host"; } else { host = url.GetHost(); } // If accessible add it back to the old node // and do DecProcessed TFileNode *node = (TFileNode*) fFileNodes->FindObject( host ); if (node && fTryReassign) { // The packet 'e' was processing data from this node. node->DecreaseProcessed(e->GetNum()); // The file should be already in fFilesToProcess ... node->Add(e, kFALSE); if (!fUnAllocated->FindObject(node)) fUnAllocated->Add(node); return 0; } else { // Add to the list of missing files TFileInfo *fi = e->GetFileInfo(); if (listOfMissingFiles && *listOfMissingFiles) (*listOfMissingFiles)->Add((TObject *)fi); return -1; } } //////////////////////////////////////////////////////////////////////////////// /// Split into per host entries /// The files in the listOfMissingFiles can appear several times; /// in order to fix that, a TDSetElement::Merge method is needed. void TPacketizerAdaptive::SplitPerHost(TList *elements, TList **listOfMissingFiles) { if (!elements) { Error("SplitPerHost", "Empty list of packets!"); return; } if (elements->GetSize() <= 0) { Error("SplitPerHost", "The input list contains no elements"); return; } TIter subSetIter(elements); TDSetElement *e; while ((e = (TDSetElement*) subSetIter.Next())) { if (ReassignPacket(e, listOfMissingFiles) == -1) { // Remove from the list in order to delete it. if (elements->Remove(e)) Error("SplitPerHost", "Error removing a missing file"); delete e; } } }