// @(#)root/proofplayer:$Id$ // Author: G.Ganis July 2011 /************************************************************************* * Copyright (C) 1995-2004, Rene Brun and Fons Rademakers. * * All rights reserved. * * * * For the licensing terms see $ROOTSYS/LICENSE. * * For the list of contributors see $ROOTSYS/README/CREDITS. * *************************************************************************/ ////////////////////////////////////////////////////////////////////////// // // // TProofMonSenderML // // // // TProofMonSender implementation for the ML writer. // // // ////////////////////////////////////////////////////////////////////////// #include "TProofMonSenderML.h" #include "TDSet.h" #include "TFileInfo.h" #include "THashList.h" #include "TList.h" #include "TParameter.h" #include "TPluginManager.h" #include "TProofDebug.h" #include "TROOT.h" #include "TSystem.h" #include "TVirtualMonitoring.h" //________________________________________________________________________ TProofMonSenderML::TProofMonSenderML(const char *serv, const char *tag, const char *id, const char *subid, const char *opt) : TProofMonSender(serv, "ProofMonSenderML") { // Main constructor fWriter = 0; // Init the sender instance using the plugin manager TPluginHandler *h = 0; if ((h = gROOT->GetPluginManager()->FindHandler("TVirtualMonitoringWriter", "MonaLisa"))) { if (h->LoadPlugin() != -1) { fWriter = (TVirtualMonitoringWriter *) h->ExecPlugin(5, serv, tag, id, subid, opt); if (fWriter && fWriter->IsZombie()) SafeDelete(fWriter); } } // Flag this instance as valid if the writer initialization succeeded if (fWriter) ResetBit(TObject::kInvalidObject); // Set default send control options SetBit(TProofMonSender::kSendSummary); ResetBit(TProofMonSender::kSendDataSetInfo); ResetBit(TProofMonSender::kSendFileInfo); fSummaryVrs = 1; fDataSetInfoVrs = 1; fFileInfoVrs = 1; // Transfer verbosity requirements PDB(kMonitoring,1) if (fWriter) fWriter->Verbose(kTRUE); } //________________________________________________________________________ TProofMonSenderML::~TProofMonSenderML() { // Destructor SafeDelete(fWriter); } //________________________________________________________________________ Int_t TProofMonSenderML::SendSummary(TList *recs, const char *id) { // Send summary record // // There are three versions of this record, corresponding the evolution // in time of the monitoring requirements. // // The default version 2 contains the following information // // user XRD_STRING // proofgroup XRD_STRING // begin XRD_STRING // end XRD_STRING // walltime XRD_REAL64 // cputime XRD_REAL64 // bytesread XRD_REAL64 // events XRD_REAL64 // totevents XRD_REAL64 // workers XRD_REAL64 // vmemmxw XRD_REAL64 // rmemmxw XRD_REAL64 // vmemmxm XRD_REAL64 // rmemmxm XRD_REAL64 // numfiles XRD_REAL64 // missfiles XRD_REAL64 // status XRD_REAL64 // rootver XRD_STRING // // Version 1 contains the following information // (no 'status', 'missfiles', 'rootver'; 'dataset' field with name(s) of // processed dataset(s)) // // user XRD_STRING // proofgroup XRD_STRING // begin XRD_STRING // end XRD_STRING // walltime XRD_REAL64 // cputime XRD_REAL64 // bytesread XRD_REAL64 // events XRD_REAL64 // totevents XRD_REAL64 // workers XRD_REAL64 // vmemmxw XRD_REAL64 // rmemmxw XRD_REAL64 // vmemmxm XRD_REAL64 // rmemmxm XRD_REAL64 // numfiles XRD_REAL64 // dataset XRD_STRING // // Version 0 contains the following information // ('group' instead of 'proofgroup'; no 'vmemmxw', // 'rmemmxw', 'vmemmxm', 'rmemmxm', 'numfiles', 'dataset') // // user XRD_STRING // group XRD_STRING // begin XRD_STRING // end XRD_STRING // walltime XRD_REAL64 // cputime XRD_REAL64 // bytesread XRD_REAL64 // events XRD_REAL64 // totevents XRD_REAL64 // workers XRD_REAL64 // // Return 0 on success, -1 on any failure. if (!IsValid()) { Error("SendSummary", "invalid instance: do nothing!"); return -1; } // Are we requested to send this info? if (!TestBit(TProofMonSender::kSendSummary)) return 0; // Make sure we have something to send if (!recs || (recs && recs->GetSize() <= 0)) { Error("SendSummary", "records list undefined or empty!"); return -1; } TList *xrecs = recs; PDB(kMonitoring,1) Info("SendSummary", "preparing (qid: '%s')", id); // Do not send duplicated information TObject *qtag = recs->FindObject("querytag"); if (qtag) recs->Remove(qtag); TObject *dsn = 0; // We may need to correct some variable names first if (fSummaryVrs == 0) { if ((dsn = recs->FindObject("dataset"))) recs->Remove(dsn); } else if (fSummaryVrs == 0) { // Only the first records xrecs = new TList; xrecs->SetOwner(kFALSE); TIter nxr(recs); TObject *o = 0; while ((o = nxr())) { if (!strcmp(o->GetName(), "vmemmxw")) break; xrecs->Add(o); } } PDB(kMonitoring,1) Info("SendSummary", "sending (%d entries)", xrecs->GetSize()); // Now we are ready to send Bool_t rc = fWriter->SendParameters(xrecs, id); // Restore the "dataset" entry in the list if (fSummaryVrs > 1 && dsn && xrecs == recs) { TObject *num = recs->FindObject("numfiles"); if (num) recs->AddBefore(num, dsn); else recs->Add(dsn); } // Restore the "querytag" entry in the list if (qtag) { TObject *wrks = recs->FindObject("workers"); if (wrks) recs->AddAfter(wrks, qtag); else recs->Add(qtag); } if (xrecs != recs) SafeDelete(xrecs); // Done return (rc ? 0 : -1); } //________________________________________________________________________ Int_t TProofMonSenderML::SendDataSetInfo(TDSet *dset, TList *missing, const char *begin, const char *qid) { // Post information about the processed dataset(s). The information is taken // from the TDSet object 'dset' and integrated with the missing files // information in the list 'missing'. The string 'qid' is the uninque // ID of the query; 'begin' the starting time. // // The records sent by this call will appear with ids 'dataset_' // // There are two versions of this record, with or without the starting time. // The starting time could be looked up from the summary record, if available. // // The default version 1 contains the following information: // // dsn XRD_STRING // querytag XRD_STRING // querybegin XRD_STRING // numfiles XRD_REAL64 // missfiles XRD_REAL64 // // Version 0 contains the following information: // (no 'querybegin') // // dsn XRD_STRING // querytag XRD_STRING // numfiles XRD_REAL64 // missfiles XRD_REAL64 // // The information is posted with a bulk insert. // // Returns 0 on success, -1 on failure. if (!IsValid()) { Error("SendDataSetInfo", "invalid instance: do nothing!"); return -1; } // Are we requested to send this info? if (!TestBit(TProofMonSender::kSendDataSetInfo)) return 0; // The query id (tag) must be given if (!qid || (qid && strlen(qid) <= 0)) { Error("SendDataSetInfo", "query id (tag) undefined!"); return -1; } // The dataset must be given if (!dset) { Error("SendDataSetInfo", "TDSet object undefined! (qid: '%s')", qid); return -1; } PDB(kMonitoring,1) Info("SendDataSetInfo", "preparing (qid: '%s')", qid); TList plets; // Extract the information and save it into the relevant multiplets TString dss(dset->GetName()), ds; Ssiz_t from = 0; while ((dss.Tokenize(ds, from , "[,| ]"))) { // Create a new TDSetPlet and add it to the list plets.Add(new TDSetPlet(ds.Data())); } // Now try to count the files TDSetPlet *plet = 0; TIter nxpl(&plets); TObject *o = 0; TDSetElement *e = 0, *ee = 0; TDSet *dsete = 0; TIter nxe(dset->GetListOfElements()); TString dse; while ((o = nxe())) { if ((e = dynamic_cast(o))) { dse = e->GetDataSet(); if (!dse.IsNull()) { nxpl.Reset(); while ((plet = (TDSetPlet *) nxpl())) { if (dse == plet->GetName()) { plet->fFiles += 1; break; } } } } else if ((dsete = dynamic_cast(o))) { PDB(kMonitoring,1) Info("SendDataSetInfo", "dset '%s' (%d files)", o->GetName(), dsete->GetListOfElements()->GetSize()); TIter nxee(dsete->GetListOfElements()); while ((ee = (TDSetElement *) nxee())) { dse = ee->GetDataSet(); if (!dse.IsNull()) { nxpl.Reset(); while ((plet = (TDSetPlet *) nxpl())) { if (dse == plet->GetName()) { plet->fFiles += 1; plet->fDSet = dsete; break; } } } } } else { Warning("SendDataSetInfo", "ignoring unknown element type: '%s'", o->ClassName()); } } // Now try to include the missing files info if (missing) { TFileInfo *fi = 0; TIter nxm(missing); TString dsfi, fn; while ((fi = (TFileInfo *) nxm())) { dsfi = fi->GetTitle(); if (!dsfi.IsNull() && dsfi != "TFileInfo") { nxpl.Reset(); while ((plet = (TDSetPlet *) nxpl())) { if (dsfi == plet->GetName()) { fn = fi->GetCurrentUrl()->GetUrl(); if (plet->fDSet && plet->fDSet->GetListOfElements() && !(plet->fDSet->GetListOfElements()->FindObject(fn))) plet->fFiles += 1; plet->fMissing += 1; break; } } } } } // Prepare objects to be sent TList values; TNamed *nm_dsn = new TNamed("dsn", ""); values.Add(nm_dsn); TNamed *nm_querytag = new TNamed("querytag", qid); values.Add(nm_querytag); TNamed *nm_begin = 0; if (fDataSetInfoVrs > 0) { nm_begin = new TNamed("begin", begin); values.Add(nm_begin); } TParameter *pi_numfiles = new TParameter("numfiles", -1); values.Add(pi_numfiles); TParameter *pi_missfiles = new TParameter("missfiles", -1); values.Add(pi_missfiles); PDB(kMonitoring,1) Info("SendDataSetInfo", "sending (%d entries)", plets.GetSize()); Bool_t rc = kTRUE; TString dsnh; nxpl.Reset(); while ((plet = (TDSetPlet *) nxpl())) { nm_dsn->SetTitle(plet->GetName()); pi_numfiles->SetVal(plet->fFiles); pi_missfiles->SetVal(plet->fMissing); dsnh.Form("dataset_%x", TString(plet->GetName()).Hash()); if (!(rc = fWriter->SendParameters(&values, dsnh.Data()))) break; } // Done return (rc ? 0 : -1); } //________________________________________________________________________ Int_t TProofMonSenderML::SendFileInfo(TDSet *dset, TList *missing, const char *begin, const char *qid) { // Post information about the requested files. The information is taken // from the TDSet object 'dset' and integrated with the missing files // information in the list 'missing'. The string 'qid' is the unique // ID of the query; 'begin' the starting time. // // The records sent by this call will appear with ids 'file_' // // There are two versions of this record, with or without the starting time. // The starting time could be looked up from the summary record, if available. // // The default version 1 contains the following information: // // lfn XRD_STRING // path XRD_STRING // querytag XRD_STRING // querybegin XRD_STRING // status XRD_REAL64 // // Version 0 contains the following information: // (no 'querybegin') // // lfn XRD_STRING // path XRD_STRING // querytag XRD_STRING // status XRD_REAL64 // // The information is posted with a bulk insert. // // Returns 0 on success, -1 on failure. if (!IsValid()) { Error("SendFileInfo", "invalid instance: do nothing!"); return -1; } // Are we requested to send this info? if (!TestBit(TProofMonSender::kSendFileInfo)) return 0; // The query id (tag) must be given if (!qid || (qid && strlen(qid) <= 0)) { Error("SendFileInfo", "query id (tag) undefined!"); return -1; } // The dataset must be given if (!dset) { Error("SendFileInfo", "TDSet object undefined! (qid: '%s')", qid); return -1; } PDB(kMonitoring,1) Info("SendFileInfo", "preparing (qid: '%s')", qid); THashList hmiss; if (missing) { TIter nxfm(missing); TFileInfo *fi = 0; while ((fi = (TFileInfo *)nxfm())) { hmiss.Add(new TObjString(fi->GetCurrentUrl()->GetUrl())); } hmiss.Print(); } // Prepare objects to be sent TList values; TNamed *nm_lnf = new TNamed("lnf", ""); values.Add(nm_lnf); TNamed *nm_path = new TNamed("path", ""); values.Add(nm_path); TNamed *nm_querytag = new TNamed("querytag", qid); values.Add(nm_querytag); TNamed *nm_begin = 0; if (fFileInfoVrs > 0) { nm_begin = new TNamed("begin", begin); values.Add(nm_begin); } TParameter *pi_status = new TParameter("status", -1); values.Add(pi_status); PDB(kMonitoring,1) Info("SendFileInfo", "sending (%d entries)", dset->GetListOfElements()->GetSize()); // Loop over files Bool_t rc = kTRUE; TObject *o = 0; TDSetElement *e = 0, *ee = 0; TDSet *dsete = 0; TIter nxe(dset->GetListOfElements()); TString fne, fneh; Int_t status = -1; while ((o = nxe())) { if ((e = dynamic_cast(o))) { fne = e->GetName(); // Try to determine the status status = 1; if (hmiss.FindObject(fne)) status = 0; // Prepare the parameters list nm_lnf->SetTitle(gSystem->BaseName(fne)); nm_path->SetTitle(gSystem->DirName(fne)); pi_status->SetVal(status); fneh.Form("file_%x", TString(TUrl(fne.Data()).GetFile()).Hash()); if (!(rc = fWriter->SendParameters(&values, fneh.Data()))) break; } else if ((dsete = dynamic_cast(o))) { PDB(kMonitoring,1) Info("SendFileInfo", "dset '%s' (%d files)", o->GetName(), dsete->GetListOfElements()->GetSize()); TIter nxee(dsete->GetListOfElements()); while ((ee = (TDSetElement *) nxee())) { fne = ee->GetName(); // Try to determine the status status = 1; if (hmiss.FindObject(fne)) status = 0; // Prepare the parameters list nm_lnf->SetTitle(gSystem->BaseName(fne)); nm_path->SetTitle(gSystem->DirName(fne)); pi_status->SetVal(status); fneh.Form("file_%x", TString(TUrl(fne.Data()).GetFile()).Hash()); if (!(rc = fWriter->SendParameters(&values, fneh.Data()))) break; } } else { Warning("SendFileInfo", "ignoring unknown element type: '%s'", o->ClassName()); } } // Done return (rc ? 0 : -1); }