// InDispatchProducer.cc // Contact person: Anthony LaTorre <tlatorre@hep.uchicago.edu> // See InDispatchProducer.hh for more details. //—————————————————————---------------------------------------------——// #include <RAT/zdab_file.hh> #include <RAT/zdab_dispatch.hh> #include <RAT/InDispatchProducer.hh> #include <RAT/ProcBlock.hh> #include <RAT/SignalHandler.hh> #include <RAT/Log.hh> #include <RAT/DB.hh> #include <RAT/DS/Root.hh> #include <RAT/DS/Run.hh> #include <RAT/DS/RunStore.hh> #include <G4UIdirectory.hh> #include <G4UIcmdWithAString.hh> #include <TChain.h> #include <assert.h> namespace RAT { InDispatchProducer::InDispatchProducer() { fMainBlock = 0; Init(); } InDispatchProducer::InDispatchProducer(ProcBlock *block) { SetMainBlock(block); Init(); } void InDispatchProducer::Init() { // Build commands G4UIdirectory* DebugDir = new G4UIdirectory("/rat/indispatch/"); DebugDir->SetGuidance("Read Events from a dispatch stream"); fReadCmd = new G4UIcmdWithAString("/rat/indispatch/read", this); fReadCmd->SetGuidance("dispatcher hostname"); fReadCmd->SetParameterName("hostname", false); // required } G4String InDispatchProducer::GetCurrentValue(G4UIcommand* /*command*/) { Log::Die("Invalid InDispatch command"); return G4String("You never see this."); } void InDispatchProducer::SetNewValue(G4UIcommand * command, G4String newValue) { // fReadCmd if(command == fReadCmd){ std::string hostname; hostname = newValue; if(!fMainBlock) Log::Die("InDispatch: No main block declared! (can't happen)"); try{ ReadEvents(hostname); } catch(ratzdab::dispatcher_connection_error & ){ Log::Die(dformat("Could not connect to %s", hostname.c_str())); } } else{ Log::Die("invalid InDispatch command"); } } /* * Takes a zdab dispatch stream zdab and reads the next record. Writes the results * into inds if it is event data, run if it is run data, and/or updates * internal variables without modifying either, or does nothing if it * doesn't know how to handle the current record. Sets gotevent to true * if it read an event (as opposed to a run header, etc.). Returns true * if it was able to read something, regardless of whether it knew what * to do with it. Returns false when zdab.next() returns NULL, normally * at the end of the file. */ static bool ReadNextRecord(DS::Root * & inds, DS::Run * run, ratzdab::dispatch & zdab) { TObject* const r = zdab.next(true); if(!r) return false; const TClass * const risa = r->IsA(); // This and other static variables in this function are not // thread-safe. This is fine as long as we don't try to use threads, // which we don't and are not planning to. static bool run_active = false; // containers for data that spans multiple events static DS::TriggerHeader* current_trig = NULL; static DS::ECAHeader* current_eped = NULL; // handle record types if(risa == DS::Root::Class()) { inds = dynamic_cast<DS::Root*>(r); static bool run_level_data_set = false; // some run-level data has to come from an event if(run_active && !run_level_data_set) { run->SetMCFlag(0); // no mc zdabs run->SetPackVer(0); // not in event, maybe MAST? run->SetDataType(0); // ??? run_level_data_set = true; } // set event headers if available DS::HeaderInfo* header = inds->GetHeaderInfo(); if(current_trig) { DS::TriggerHeader* triginfo = header->GetTriggerHeader(); *triginfo = *current_trig; } if(current_eped) { DS::ECAHeader* epedinfo = header->GetECAHeader(); *epedinfo = *current_eped; } } else if(risa == DS::Run::Class()) { *run = *(dynamic_cast<DS::Run*>(r)); run_active = true; info << "InDispatch: RHDR: Run " << run->GetRunID() << newline; DS::RunStore::AddNewRun(run); } else if(risa == DS::AVStat::Class()) { if(run_active) { DS::AVStat* avstat = run->GetAVStat(); *avstat = *(dynamic_cast<DS::AVStat*>(r)); info << "InDispatch: AVStat: Run updated" << newline; } else { warn << "InDispatch: Ignoring AVStat--there is no run active" << newline; } } else if(risa == DS::ManipStat::Class()) { if(run_active) { DS::ManipStat* mstat = run->GetManipStat(); *mstat = *(dynamic_cast<DS::ManipStat*>(r)); info << "InDispatch: ManipStat: Run updated" << newline; } else { warn << "InDispatch: Ignoring ManipStat--there is no run active" << newline; } } else if(risa == DS::TriggerHeader::Class()) { delete current_trig; current_trig = dynamic_cast<DS::TriggerHeader*>(r); info << "InDispatch: TriggerHeader\n"; } else if(risa == DS::ECAHeader::Class()) { delete current_eped; current_eped = dynamic_cast<DS::ECAHeader*>(r); info << "InDispatch: ECA\n"; } else if(risa == TObject::Class()) { info << "InDispatch: risa == TObject::Class()" << newline; //a record has been swallowed by converter on purpose } else { warn << "InDispatch: Unhandled ROOT object, type " << r->ClassName() << newline; } return true; } void InDispatchProducer::ReadEvents(const G4String & hostname) { info << "InDispatchProducer: Reading from " << hostname << newline; ratzdab::dispatch zdf(hostname); DS::Run* run = new DS::Run(); DS::Run* prevRun = NULL; long long defaultNumEvents = DB::Get()->GetLink( "IO" )->GetI( "default_num_events" ); long long count = 0; while(!SignalHandler::IsTermRequested()) { DS::Root *inds = NULL; try { if(!ReadNextRecord(inds, run, zdf)) break; } catch(ratzdab::unknown_record_error& e) { warn << "InDispatch: unknown record" << newline; continue; } if(inds) inds->SetRunHere(run); static bool first = true; if(first) { // First event, begin the run first = false; Producer::BeginOfRun(run->GetRunID()); fMainBlock->BeginOfRun(run); prevRun = run; } if(prevRun != run) { // Run has changed Producer::BeginOfRun(run->GetRunID()); fMainBlock->EndOfRun(prevRun); fMainBlock->BeginOfRun(run); prevRun = run; } if(inds){ fMainBlock->DSEvent(inds); delete inds; } if (defaultNumEvents == count) break; ++count; } // Finished, end the run fMainBlock->EndOfRun(prevRun); } } // namespace RAT