"""
Tests for ReducePyScalersTable module.
"""
# This file is part of MAUS: http://micewww.pp.rl.ac.uk:8080/projects/maus
#
# MAUS is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# MAUS is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with MAUS. If not, see .
# pylint: disable=C0103
from datetime import datetime
import json
import time
import unittest
from ReducePyScalersTable import Scaler
from ReducePyScalersTable import ReducePyScalersTable
class ScalerTestCase(unittest.TestCase): # pylint: disable=R0904, C0301
"""
Test class for Scaler.
"""
def setUp(self):
"""
Prepare for test by setting up a Scaler.
@param self Object reference.
"""
self.__scaler = Scaler()
def test_default_values(self):
"""
Check default values.
@param self Object reference.
"""
self.assertEquals(0, self.__scaler.get_count(),
"Unexpected count")
self.assertEquals(10, self.__scaler.get_recent_window(),
"Unexpected window")
self.assertEquals(0, self.__scaler.get_average(),
"Unexpected average")
self.assertEquals(0, self.__scaler.get_recent_average(),
"Unexpected recent average")
self.assertEquals(0, self.__scaler.get_recent_value(),
"Unexpected recent value")
def test_init_recent_window(self):
"""
Test setting a new window in the constructor.
@param self Object reference.
"""
scaler = Scaler(123)
self.assertEquals(123, scaler.get_recent_window(),
"Unexpected window")
def test_set_recent_window(self):
"""
Test setting a new window.
@param self Object reference.
"""
scaler = Scaler()
self.assertEquals(10, scaler.get_recent_window(),
"Unexpected window")
scaler.set_recent_window(123)
self.assertEquals(123, scaler.get_recent_window(),
"Unexpected window after set_recent_window")
def test_add_value_clear(self):
"""
Test add_value and clear.
@param self Object reference.
"""
for i in range(0, 10):
self.__scaler.add_value(i)
self.__scaler.clear()
self.assertEquals(0, self.__scaler.get_count(),
"Unexpected count")
self.assertEquals(0, self.__scaler.get_average(),
"Unexpected average")
self.assertEquals(0, self.__scaler.get_recent_average(),
"Unexpected recent average")
self.assertEquals(0, self.__scaler.get_recent_value(),
"Unexpected recent value")
def test_add_value(self):
"""
Test add_value and getters
@param self Object reference.
"""
values = range(0, 20)
for i in values:
self.__scaler.add_value(i)
self.assertEquals(len(values), self.__scaler.get_count(),
"Unexpected count")
self.assertEquals(sum(values) / len(values),
self.__scaler.get_average(),
"Unexpected average")
start = len(values) - self.__scaler.get_recent_window()
end = len(values)
self.assertEquals(
sum(values[start:end]) / self.__scaler.get_recent_window(),
self.__scaler.get_recent_average(),
"Unexpected recent average")
self.assertEquals(19, self.__scaler.get_recent_value(),
"Unexpected recent value")
class ReducePyScalersTableTestCase(unittest.TestCase): # pylint: disable=R0904, C0301
"""
Test class for ReducePyScalersTable.
"""
@classmethod
def setUpClass(self): # pylint: disable=C0202
"""
Prepare for test by setting up worker.
@param self Object reference.
"""
self.__reducer = ReducePyScalersTable()
def setUp(self):
"""
Invoke "birth" and check for success.
@param self Object reference.
"""
success = self.__reducer.birth("{}")
if not success:
raise Exception('Test setUp failed', 'reducer.birth() failed')
def test_get_scalers(self):
"""
Test get_scalers.
@param self Object reference.
"""
self.assertEquals(6, len(self.__reducer.get_scalers()),
"Unexpected number of scalers")
def test_recent_scalers_window(self):
"""
Test birth where a recent_scalers_window is given.
@param self Object reference.
"""
reducer = ReducePyScalersTable()
reducer.birth("""{"recent_scalers_window":123}""")
(_, _, scaler) = reducer.get_scalers()[0]
# Check that the value was passed down to the scalers.
self.assertEquals(123, scaler.get_recent_window(),
"Unexpected recent window")
def test_invalid_json(self):
"""
Test "process" with a bad JSON document as an argument string.
@param self Object reference.
"""
result_str = self.__reducer.process("{")
result = json.loads(result_str)
self.assertTrue("errors" in result, "No errors field")
errors = result["errors"]
self.assertTrue("ReducePyScalersTable" in errors,
"No ReducePyScalersTable field")
errors = errors["ReducePyScalersTable"]
self.assertTrue(len(errors) >= 1, "Missing error trace")
self.assertEquals(": Expecting object: line 1 column 0 (char 0)", errors[0], "Unexpected error trace") # pylint: disable=C0301
def test_no_daq_data(self):
"""
Test "process" with a JSON document with no "daq_data".
@param self Object reference.
"""
result = self.__process({})
self.assertTrue("errors" in result, "No errors field")
errors = result["errors"]
self.assertTrue("ReducePyScalersTable" in errors,
"No ReducePyScalersTable field")
errors = errors["ReducePyScalersTable"]
self.assertTrue(len(errors) >= 1, "Missing error trace")
self.assertEquals("Bad input spill - no maus_event_type", errors[0], "Unexpected error trace "+str(errors[0])) # pylint: disable=C0301
# Scalar, for validation.
expected = Scaler()
self.__check_result(result, "", None, expected)
def test_daq_data_none(self):
"""
Test "process" with a JSON document with a "daq_data"
with value None.
@param self Object reference.
"""
result = self.__process({"daq_event_type":"physics_event",
"daq_data":None,
"maus_event_type":"Spill",})
self.assertTrue("errors" in result, "No errors field")
errors = result["errors"]
self.assertTrue("ReducePyScalersTable" in errors,
"No ReducePyScalersTable field")
errors = errors["ReducePyScalersTable"]
self.assertTrue(len(errors) >= 1, "Missing error trace")
self.assertEquals(": daq_data is None", errors[0], "Unexpected error trace") # pylint: disable=C0301
# Scalar, for validation.
expected = Scaler()
self.__check_result(result, "", None, expected)
def test_no_v830(self):
"""
Test "process" with a JSON document with no "V830".
@param self Object reference.
"""
result = self.__process({"daq_event_type":"physics_event",
"daq_data":{},
"maus_event_type":"Spill",})
self.assertTrue("errors" in result, "No errors field")
errors = result["errors"]
self.assertTrue("ReducePyScalersTable" in errors,
"No ReducePyScalersTable field")
errors = errors["ReducePyScalersTable"]
self.assertTrue(len(errors) >= 1, "Missing error trace")
self.assertEquals(": 'V830 is not in spill'", errors[0], "Unexpected error trace") # pylint: disable=C0301
# Scalar, for validation.
expected = Scaler()
self.__check_result(result, "", None, expected)
def test_no_channels(self):
"""
Test "process" with a JSON document with no "channels".
@param self Object reference.
"""
result = self.__process({"daq_event_type":"physics_event",
"daq_data":{"V830":{}},
"maus_event_type":"Spill",})
self.assertTrue("errors" in result, "No errors field")
errors = result["errors"]
self.assertTrue("ReducePyScalersTable" in errors,
"No ReducePyScalersTable field")
errors = errors["ReducePyScalersTable"]
self.assertTrue(len(errors) >= 1, "Missing error trace")
self.assertEquals(": 'channels is not in spill'", errors[0], "Unexpected error trace") # pylint: disable=C0301
# Scalar, for validation.
expected = Scaler()
self.__check_result(result, "", None, expected)
def test_no_ch0(self):
"""
Test "process" with a JSON document with no "ch0".
@param self Object reference.
"""
result = self.__process({"daq_data":{"V830":{"channels":{}}},
"daq_event_type":"physics_event",
"maus_event_type":"Spill",})
self.assertTrue("errors" in result, "No errors field")
errors = result["errors"]
self.assertTrue("ReducePyScalersTable" in errors,
"No ReducePyScalersTable field")
errors = errors["ReducePyScalersTable"]
self.assertTrue(len(errors) >= 1, "Missing error trace")
self.assertEquals(": 'ch0 is not in spill'", errors[0], "Unexpected error trace") # pylint: disable=C0301
# Scalar, for validation.
expected = Scaler()
print result
self.__check_result(result, "", None, expected)
def __process(self, json_doc):
"""
Convert given JSON document to a string and pass to "process".
@param self Object reference.
@param json_doc JSON document.
@returns JSON document string from "process".
"""
json_str = json.dumps(json_doc)
result_str = self.__reducer.process(json_str)
return json.loads(result_str)
@staticmethod
def __get_spill(event, time_stamp, value):
"""
Create a sample spill.
@param event Event ID to put in spill.
@param time_stamp Time stamp to put in spill.
@params value Value for each channel.
@return spill
"""
spill = {}
spill["daq_event_type"] = "physics_event"
spill["maus_event_type"] = "Spill"
spill["daq_data"] = {}
spill["daq_data"]["V830"] = {}
scalers = spill["daq_data"]["V830"]
scalers["phys_event_number"] = event
scalers["time_stamp"] = time_stamp
hits = {}
hits["ch0"] = value
hits["ch1"] = value
hits["ch2"] = value
hits["ch3"] = value
hits["ch4"] = value
hits["ch12"] = value
scalers["channels"] = hits
return spill
def test_process_spill(self):
"""
Test "process" with a spill.
@param self Object reference.
"""
current_time = time.time()
spill = ReducePyScalersTableTestCase.__get_spill( \
"process_spill", current_time, 1)
result = self.__process(spill)
# Scalar, for validation.
expected = Scaler()
expected.add_value(1)
self.__check_result(result, "process_spill", current_time, expected)
def test_process_multiple_spills(self):
"""
Test "process" with multiple JSON documents so to cycle
past the window size of the average of the 10 most recent
values.
@param self Object reference.
"""
# Scalar, for validation.
expected = Scaler()
for i in range(0, 14):
expected.add_value(i)
event = "multiple_spills %d" % i
current_time = time.time()
spill = ReducePyScalersTableTestCase.__get_spill( \
event, current_time, i)
result = self.__process(spill)
self.__check_result(result, event, current_time, expected)
# Send down an end of run.
end_of_run = {"daq_data":None, "daq_event_type":"end_of_run", \
"run_num":1, "spill_num":-1}
result = self.__process(end_of_run)
self.__check_result(result, event, current_time, expected)
def test_end_of_run(self):
"""
Test "process" with a JSON document which is an end_of_run.
@param self Object reference.
"""
end_of_run = {"daq_data":None, "daq_event_type":"end_of_run", \
"run_num":1, "spill_num":-1}
result = self.__process(end_of_run)
self.__check_result(result, "", None, Scaler())
result = self.__process(end_of_run)
self.assertEquals({}, result, "Unexpected spill after end_of_run")
def __check_result(self, result, event, time_stamp, expected): # pylint: disable=R0913, C0301
"""
Check result spill for process.
@param self Object reference.
@param result spill to validate.
@param event Event ID to put in spill.
@param time_stamp Time stamp to put in spill.
@param expected Scaler containing expected results.
"""
self.assertTrue("keywords" in result, "No keywords")
self.assertTrue("table_headings" in result, "No table headings")
headings = result["table_headings"]
self.assertEquals("Average of last %d values" \
% expected.get_recent_window(),
headings[2],
"Unexpected heading")
self.assertTrue("description" in result, "No description")
if (time_stamp != None):
time_str = datetime.fromtimestamp(time_stamp)
else:
time_str = ""
description = "Scaler counts from channel data for event: %s at time: %s" % (event, time_str) # pylint: disable=C0301
self.assertEquals(description, result["description"],
"Unexpected description\n "+description+"\n "+result["description"])
self.assertTrue("table_data" in result, "No data")
data = result["table_data"]
self.assertEquals(6, len(data), "Unexpected number of rows")
for i in range(0, 6):
row = data[i]
self.assertEquals(expected.get_recent_value(),
row[1], "Unexpected recent value")
self.assertEquals(expected.get_recent_average(),
row[2], "Unexpected recent average")
self.assertEquals(expected.get_average(),
row[3], "Unexpected average")
def tearDown(self):
"""
Invoke "death".
@param self Object reference.
"""
success = self.__reducer.death()
if not success:
raise Exception('Test setUp failed', 'reducer.death() failed')
@classmethod
def tearDownClass(self): # pylint: disable=C0202
"""
Set the worker to None.
@param self Object reference.
"""
self.__reducer = None
if __name__ == '__main__':
unittest.main()