# This file is part of MAUS: http://micewww.pp.rl.ac.uk:8080/projects/maus # # MAUS is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # MAUS is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with MAUS. If not, see . """ Test for MapCppCuts """ # pylint: disable=C0103 # pylint: disable=R0904 import os import json import unittest import ROOT import urllib2 import tarfile from Configuration import Configuration import maus_cpp.converter from MAUS import MapCppCuts class MapCppCutsTest(unittest.TestCase): """Tests for MapCppCuts""" def setUp(self): # pylint: disable=C0103, C0202 """Check we can construct the mapper""" self.mapper = MapCppCuts() cards_json = json.loads(Configuration().getConfigJSON()) self.cards_str = json.dumps(cards_json) @classmethod def setUpClass(cls): """Download and untar reco file""" filedata = urllib2.urlopen('http://gfe02.grid.hep.ph.ic.ac.uk:8301/RECO/MAUS-v3.0.1/1/Step4/09900/09954_offline.tar?action=show') datatowrite = filedata.read() with open('%s/third_party/install/share/test_data/09954_offline.tar' % os.environ.get("MAUS_ROOT_DIR"), 'wb') as f: f.write(datatowrite) tar = tarfile.open('%s/third_party/install/share/test_data/09954_offline.tar' % os.environ.get("MAUS_ROOT_DIR")) tar.extractall(path='%s/third_party/install/share/test_data/' % os.environ.get("MAUS_ROOT_DIR")) tar.close() my_file_name = ("%s/third_party/install/share/test_data/09954_recon.root" %\ os.environ.get("MAUS_ROOT_DIR")) # now load the ROOT file print "Loading ROOT file", my_file_name cls.root_file = ROOT.TFile(my_file_name, "READ") # pylint: disable = E1101 # and set up the data tree to be filled by ROOT IO print "Setting up data tree" cls.data_ptr = ROOT.MAUS.Data() # pylint: disable = E1101 cls.tree = cls.root_file.Get("Spill") cls.tree.SetBranchAddress("data", cls.data_ptr) @classmethod def load_data(cls, spill_number): """Load the data""" # We will try to load the data now into a analysable format print "spill number", spill_number if spill_number > 0 and spill_number < cls.tree.GetEntries(): #print tree.GetEntries() print "# entries ", cls.tree.GetEntries() cls.tree.GetEntry(spill_number) return cls.data_ptr #spill = data_ptr.GetSpill() else: raise IndexError("Tried to get a spill which was out of range") ######## tests on Process ######### def test_birth(self): # pylint: disable=R0201 """Check we pass correct input to birth""" self.mapper.birth(self.cards_str) def test_empty(self): """Check mapper runs for empty string, returning an error""" self.assertRaises(ValueError, self.mapper.birth, "") result = self.mapper.process("") doc = maus_cpp.converter.json_repr(result) self.assertIn("errors", doc) self.assertIn("MapCppCuts", doc["errors"]) def test_process(self): """Check mapper for cuts, check if cuts are passed correctly to the vector""" fout = open("adding_cuts.json", "w") self.mapper.birth(self.cards_str) # Default set of cuts cuts_events = {"cuts_value_store":[False]*12} # Since the cuts are not implemented in the data structure yet, they have to be # added manually. for i in range(1, 100): #16200 entries data_ptr = self.load_data(i) data_json = maus_cpp.converter.json_repr(data_ptr) if 'recon_events' in data_json.keys(): recon_events = data_json['recon_events'] print "#recon_events", len(recon_events) if len(recon_events)==0: print "recon events is empty" continue for k in range(len(recon_events)): first_event = recon_events[k] first_event['cuts'] = cuts_events result = self.mapper.process(data_json) # spill = maus_cpp.converter.json_repr(result) # print "Recon events in spill:", len(spill["recon_events"]) # Write to json file print >> fout, json.dumps(data_json) # test with some events # Test data for event 0 with 1 spill # reference_data_0 = [ # [False]*12, # ] # Result now contains the data from processor -> now get the spills from a recon event if result.GetSpill().GetReconEvents() == 0: rec0 = result.GetSpill().GetReconEvents()[0] print "empty event", rec0 if result.GetSpill().GetReconEvents() != 0: # This part contains the test of the first 3 cuts # If one of the first two cuts (tof0 and tof1 spacepoint hit) fails # then the third cut (time of flight) should also fail cut_store_0 = rec0.GetCutEvent().GetCutStore() # cut_store_1 = rec0.GetCutEvent().GetCutStore() tof0_sps = rec0.GetTOFEvent().GetTOF0SpacePointArray() # tof1_sps = rec0.GetTOFEvent().GetTOF1SpacePointArray() # for tof0_sps in range(tof0_sps.size()): # tof0_hitTime = tof0_sps.GetTime() # for tof1_sps in range(tof1_sps.size()): # tof1_hitTime = tof1_sps.GetTime() #dt = tof1_hitTime - tof0_hitTime #willCut_0 = True willCut_1 = False for sp_0 in tof0_sps: if len(sp_0) == 0: self.assertTrue(cut_store_0[0], willCut_1) self.assertTrue(cut_store_0[2], willCut_1) # if len(sp_0) == 1: # self.assertFalse(cut_store_0[0], willCut_1) # for sp_1 in tof1_sps: # if len(sp_1) == 0: # self.assertTrue(cut_store_0[1], willCut_1) # self.assertTrue(cut_store_0[2], willCut_1) # if len(sp_1) == 1: # self.assertFalse(cut_store_0[1], willCut_1) # if cut_store_0[0] == True and cut_store_0[1] == True: # Make sure the cuts for the time of flight correspond to the ones # in the configuration file # if dt >= 28. and dt <= 42.: # self.assertFalse(cut_store_0[2], willCut_1) # if dt < 28. or dt > 42.: # self.assertTrue(cut_store_0[2], willCut_1) # Check that the values that are set correspond to ref_data_0 #for ref in reference_data_0: # for i in range(cut_store_1.size()): # self.assertFalse(cut_store_1[i], ref[i]) def tearDown(self): # pylint: disable=C0103, C0202 """Check we can end the mapper""" self.mapper.death() self.mapper = None if __name__ == '__main__': unittest.main()