#!/usr/bin/env python
# This file is part of MAUS: http://micewww.pp.rl.ac.uk/projects/maus
#
# MAUS is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# MAUS is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with MAUS. If not, see .
#
"""
Provides an implementation of a covariance matrix calculator.
Doesn't store individual hits - so no limitations due to memory usage.
Also includes a correction matrix calculator which analyses the differences
between an MC and Reconstructed hits to produce the systematic corrections
to the reconstructed covariance matrix; in additiona to some useful functions.
"""
# pylint: disable = W0311, E1101, W0102, R0902, C0103, R0904
import numpy
import math
from analysis.tools import MUON_MASS
import analysis.hit_types
VARIABLE_LIST = [ 'x', 'px', 'y', 'py', 'z', 'pz', 't', 'E' ]
POSITION_VARIABLES = [ 'x', 'y', 'z', 't' ]
MOMENTUM_VARIABLES = [ 'px', 'py', 'pz', 'E' ]
CONJUGATE_PAIRS = { 'x':'px', 'y':'py', 'z':'pz', 't':'E', \
'px':'x', 'py':'y', 'pz':'z', 'E':'t' }
VARIABLE_ENUMERATION = {}
for var_num, var in enumerate(VARIABLE_LIST) :
VARIABLE_ENUMERATION[var] = var_num
def check_axes( axes, master_list=None ) :
"""
Checks a list of axes against the 'VARIABLE_LIST' variable.
Throws an exception if there is an incompatibility.
"""
if master_list == None :
master_list = VARIABLE_LIST
for axis in axes :
if axis not in master_list :
raise KeyError( 'Could not find axis "' + str( axis ) + \
'" in the allowed variables.' )
def emittance_from_matrix( matrix, mass = None ) :
"""
Calculate the emittance from a 2D covariance matrix. Requires the matrix
to be square and have dimension 2nx2n for integer n.
If no mass is provided, the muon mass is used by default.
"""
if mass is None :
mass = MUON_MASS
if ( len( matrix ) % 2.0 ) != 0.0 :
raise ArithmeticError( 'A Matrix with even numbers of rows and columns is \
Required to calculate an Emittance.' )
if isinstance( matrix, numpy.matrix ) :
shape = matrix.shape
if ( shape[0] != shape[1] ) :
print len( matrix ), len( matrix[0] ), map( len, matrix )
raise ArithmeticError( 'A 2D Square Matrix is \
Required to calculate an Emittance.' )
elif len( matrix ) != len( matrix[0] ) :
print len( matrix ), len( matrix[0] ), map( len, matrix )
raise ArithmeticError( 'A 2D Square Matrix is \
Required to calculate an Emittance.' )
dim = len( matrix )
return ( numpy.linalg.det( matrix ) ** ( 1.0 / dim ) ) / mass
def get_conjugates( axes ) :
"""
Returns a list of axis labels, corresponding to the conjuate labels in the
supplied list.
"""
check_axes( axes )
new_list = []
for axis in axes :
new_list.append( CONJUGATE_PAIRS[axis] )
return new_list
class CovarianceMatrix() :
"""
Performs running calculations of a covariance matrix to allow for larger
datasets to be analysed with a smaller memory footprint. Running sums and
totals are stored, to be combined when necessary to form the final
covariance matrix.
Uses the whole 6D phase space, however the user may request subsets.
"""
def __init__( self ) :
"""
Initialse the object
Set counters to zero, and values to zero.
"""
self._numVars = len( VARIABLE_LIST )
self._mean_vector = numpy.zeros( self._numVars )
self._product_matrix = numpy.zeros( ( self._numVars, self._numVars ) )
self._mean_momentum = 0.0
self._num_particles = 0.0
self._correction_matrix = None
def length( self ) :
"""
Return the number of particles added to the matrix
"""
return self._num_particles
def save_covariance( self, filename, axes = [ 'x', 'px', 'y', 'py', 'z', 'pz', 't', 'E' ] ) :
"""
Save the covariance matrix to a file called .
"""
cov = self.get_covariance_matrix(axes)
numpy.savetxt(filename, cov)
def clear( self ) :
"""
Set our two matrices to zeros.
As if nothing had happened!
"""
self._mean_vector = numpy.zeros( self._numVars )
self._product_matrix = numpy.zeros( ( self._numVars, self._numVars ) )
self._mean_momentum = 0.0
self._num_particles = 0.0
def add_hit_scifi( self, scifi_hit ) :
"""
Add a scifi hit and extract the information into the class.
Accepts both xboa type hits and the local emittance analysis type hits.
"""
self.add_hit( analysis.hit_types.AnalysisHit(scifi_track_point=scifi_hit) )
def add_hit( self, hit ) :
"""
Add a hit and extract the information into the class.
Accepts both xboa type hits and the local emittance analysis type hits.
"""
weight = hit.get_weight()
for row, rowvar in enumerate( VARIABLE_LIST ) :
for col, colvar in enumerate( VARIABLE_LIST ) :
self._product_matrix[row][col] += hit.get( rowvar ) *\
hit.get( colvar ) * weight**2.0
self._mean_vector[row] += hit.get( rowvar ) * weight
self._mean_momentum += hit.get_p()*weight
self._num_particles += 1.0*weight
def add_dict( self, dictionary ) :
"""
Add a hit by specifying the individual components in the 6D phase space
in a dictionary of form { 'x': x, 'y': y ... }
All variables in the 'VARIABLE_LIST' list must be provided.
"""
for key in VARIABLE_LIST.keys :
if key not in dictionary.keys :
raise KeyError( str( key ) + ' not found in supplied dictionary.' )
for row, rowvar in enumerate( VARIABLE_LIST ) :
for col, colvar in enumerate( VARIABLE_LIST ) :
self._product_matrix[row][col] += dictionary[ rowvar ] *\
dictionary[ colvar ]
self._mean_vector[row] += dictionary[rowvar]
self._mean_momentum += math.sqrt(dictionary['px']**2 + \
dictionary['py']**2 + dictionary['pz']**2)
self._num_particles += 1
def set_covariance_correction( self, correction, axes ) :
"""
Store a correction matrix to be applied to the determinant calculations.
'axes' must describe the rows and columns of the matrix.
"""
check_axes( axes )
if correction.shape != (len(axes), len(axes)) :
raise ValueError('Expected square correction matrix that '+\
'matches the axes')
self._correction_matrix = numpy.zeros( (len( VARIABLE_LIST ), \
len( VARIABLE_LIST )) )
for covrow, rowvar in enumerate( axes ) :
row = -1
for num, test in enumerate( VARIABLE_LIST ) :
if rowvar == test :
row = num
for covcol, colvar in enumerate( axes ) :
col = -1
for num, test in enumerate( VARIABLE_LIST ) :
if colvar == test :
col = num
self._correction_matrix[row][col] = correction[covrow][covcol]
def get_covariance_correction( self, axes ) :
"""
Returns the covariance matrix correction formatted for the supplied axes
"""
check_axes( axes )
correction = numpy.zeros( (len( axes ), len( axes )) )
for covrow, rowvar in enumerate( axes ) :
row = -1
for num, test in enumerate( VARIABLE_LIST ) :
if rowvar == test :
row = num
for covcol, colvar in enumerate( axes ) :
col = -1
for num, test in enumerate( VARIABLE_LIST ) :
if colvar == test :
col = num
correction[covrow][covcol] = self._correction_matrix[row][col]
return correction
def get_covariance_matrix( self, axes = [ 'x', 'px', 'y', 'py' ] ) :
"""
Combines the vector and matrix to form the covariance matrix and returns
it.
"""
check_axes( axes )
if self._num_particles == 0 :
raise ZeroDivisionError( 'No particles found. Cannot divide by zero.' )
cov_matrix = numpy.empty( ( len( axes ), len( axes ) ) )
for covrow, rowvar in enumerate( axes ) :
row = -1
for num, test in enumerate( VARIABLE_LIST ) :
if rowvar == test :
row = num
for covcol, colvar in enumerate( axes ) :
col = -1
for num, test in enumerate( VARIABLE_LIST ) :
if colvar == test :
col = num
cov_matrix[covrow][covcol] = ( self._product_matrix[row][col] - \
( self._mean_vector[row] * self._mean_vector[col] ) /\
self._num_particles ) / self._num_particles
return cov_matrix
def get_corrected_covariance_matrix( self, axes = [ 'x', 'px', 'y', 'py' ] ) :
"""
Applies the stored covariance matrix correction to the covariance matrix
"""
if self._correction_matrix is None :
return self.get_covariance_matrix( axes )
cov = self.get_covariance_matrix( axes )
cor = self.get_covariance_correction( axes )
return cov + cor
def get_component( self, axes = [ 'x', 'x' ] ) :
"""
Returns a single component from the covariance matrix. Length of axes
must therefore be two.
"""
check_axes( axes )
if len( axes ) != 2 :
raise ValueError( "Must supply two axis labels to obtain a \
single component" )
row = VARIABLE_ENUMERATION[axes[0]]
col = VARIABLE_ENUMERATION[axes[1]]
return ( self._product_matrix[row][col] - \
( self._mean_vector[row] * self._mean_vector[col] ) /\
self._num_particles ) / self._num_particles
def get_determinant( self, axes = [ 'x', 'px', 'y', 'py' ] ) :
"""
Returns the determinant of the covariace matrix
"""
determinant = numpy.linalg.det( self.get_corrected_covariance_matrix(axes) )
if math.isnan(determinant) :
determinant = 0.0
return determinant
def get_momentum( self ) :
"""
Returns the averaged value of momentum for the ensemble of particles
"""
if self._num_particles > 0 :
return self._mean_momentum / self._num_particles
else :
return 0.0
def get_emittance( self, axes = [ 'x', 'px', 'y', 'py' ], mass = None ) :
"""
Calculates the emittance of covariance matrix.
Requires an even number of axes to be provided, defaults to 4D
transverse emittance.
Emittance calculated as:
( Det( Cov ) ** ( 1 / len( axes ) ) ) / mass
"""
check_axes( axes )
if len( axes ) % 2 != 0 :
raise FloatingPointError( 'Must supply and even number of axes to \
calculate an emittance' )
if mass == 0 :
raise ZeroDivisionError( 'Cannot divide by a mass of zero.' )
if mass is None :
mass = MUON_MASS
power = 1.0 / float( len( axes ) )
return ( self.get_determinant( axes )**power ) / mass
def get_alpha( self, axes = ['x', 'y'], mass = None, emitt = None ) :
"""
Calculats a alpha function for the axes specified.
The axes provided should be positional axes only, e.g. ['x', 'y']
Beta function calculated as:
( Sum( Axis Covariances ) / ( emittance * mass * len( axes ) )
"""
check_axes( axes, master_list=POSITION_VARIABLES )
if mass == 0.0 :
raise ZeroDivisionError( 'Cannot divide by a mass of zero.' )
if mass is None :
mass = MUON_MASS
num = len( axes )
conjugates = get_conjugates( axes )
full_axes = axes + conjugates
if emitt is None :
emitt = self.get_emittance( mass=mass, axes=full_axes )
cov_mat = self.get_corrected_covariance_matrix( full_axes )
covariance_sum = 0.0
for i in range( num ) :
covariance_sum += cov_mat[i][i+num]
return - ( covariance_sum / num ) / ( emitt * mass )
def get_beta( self, axes = ['x', 'y'], mass = None, momentum = None, \
emitt = None ) :
"""
Calculats a beta function for the axes specified.
The axes provided should be positional axes only, e.g. ['x', 'y']
Beta function calculated as:
( Sum( Axis Variances ) * momentum / ( emittance * mass * len( axes ) )
"""
check_axes( axes, master_list=POSITION_VARIABLES )
if mass == 0.0 :
raise ZeroDivisionError( 'Cannot divide by a mass of zero.' )
if mass is None :
mass = MUON_MASS
if momentum is None :
momentum = self._mean_momentum / self._num_particles
# raise ValueError( 'Please provide a value of mean beam momentum' )
if momentum <= 0.0 :
raise ValueError( 'Expected positive value of momentum' )
num = len( axes )
cov_mat = self.get_corrected_covariance_matrix( axes )
full_axes = axes + get_conjugates( axes )
if emitt is None :
emitt = self.get_emittance( mass=mass, axes=full_axes )
variance_sum = 0.0
for i in range( len( axes ) ) :
variance_sum += cov_mat[i][i]
return ( variance_sum / num ) * ( momentum / ( emitt * mass ) )
def get_gamma( self, axes = ['px', 'py'], mass = None, momentum = None, \
emitt = None ) :
"""
Calculats gamma for the axes specified.
The axes provided should be momentum axes only, e.g. ['px', 'py']
Beta function calculated as:
( Sum( Axis Variances ) * momentum / ( emittance * mass * len( axes ) )
"""
check_axes( axes, master_list=MOMENTUM_VARIABLES )
if mass == 0.0 :
raise ZeroDivisionError( 'Cannot divide by a mass of zero.' )
if mass is None :
mass = MUON_MASS
if momentum is None :
momentum = self._mean_momentum / self._num_particles
# raise ValueError( 'Please provide a value of mean beam momentum' )
if momentum <= 0.0 :
raise ValueError( 'Expected positive value of momentum' )
num = len( axes )
cov_mat = self.get_corrected_covariance_matrix( axes )
full_axes = axes + get_conjugates( axes )
if emitt is None :
emitt = self.get_emittance( mass=mass, axes=full_axes )
variance_sum = 0.0
for i in range( len( axes ) ) :
variance_sum += cov_mat[i][i]
return ( variance_sum / num ) * ( 1.0 / ( momentum * emitt * mass ) )
def get_canonical_angular_momentum( self, axes = ['x', 'y'] ) :
"""
Calculats the canonical angular momentum for the axes specified.
The axes provided should be precisely two positional axes only,
e.g. ['x', 'y']
Canonical Angular Momentum calculated as:
Covariance(
"""
check_axes( axes, master_list=POSITION_VARIABLES )
if len( axes ) != 2 :
raise ValueError( 'Can only determine angular momentum for 2 dimensions' )
conjugates = get_conjugates( axes )
full_axes = axes + conjugates
L = 0.0
x_var = VARIABLE_ENUMERATION[full_axes[0]]
px_var = VARIABLE_ENUMERATION[full_axes[2]]
y_var = VARIABLE_ENUMERATION[full_axes[1]]
py_var = VARIABLE_ENUMERATION[full_axes[3]]
L = (self._product_matrix[x_var][py_var] - \
self._product_matrix[y_var][px_var] ) / \
self._num_particles
return L
def get_angular_momentum( self, axes = ['x', 'y'] ) :
"""
Calculats the canonical angular momentum for the axes specified.
The axes provided should be precisely two positional axes only,
e.g. ['x', 'y']
Canonical Angular Momentum calculated as:
Covariance(
"""
check_axes( axes, master_list=POSITION_VARIABLES )
if len( axes ) != 2 :
raise ValueError( 'Can only determine angular momentum for 2 dimensions' )
conjugates = get_conjugates( axes )
full_axes = axes + conjugates
L = 0.0
x_var = VARIABLE_ENUMERATION[full_axes[0]]
px_var = VARIABLE_ENUMERATION[full_axes[2]]
y_var = VARIABLE_ENUMERATION[full_axes[1]]
py_var = VARIABLE_ENUMERATION[full_axes[3]]
L = (self._product_matrix[x_var][py_var] - \
self._product_matrix[y_var][px_var] ) / \
self._num_particles
return L
def get_rms( self, axes=['x'] ) :
"""
Calculates the geometry average of all the RMS' requested
Calculated as:
( Sum ( Axis Variance ) ) ^ (1/n)
"""
check_axes( axes )
sum_variances = 0.0
for axis in axes :
sum_variances = self.get_component([axis, axis])
return sum_variances**(1.0 / len(axes))
def get_means( self, axes = ['x', 'px', 'y', 'py', 'z', 'pz', 't', 'E' ] ) :
"""
Returns an array of mean values for each of the parameters in the axis
list.
"""
check_axes( axes )
means_vec = {}
for axis in axes :
rownum = VARIABLE_ENUMERATION[axis]
means_vec[axis] = self._mean_vector[rownum] / self._num_particles
return means_vec
def get_mean( self, axis = 'x' ) :
"""
Returns an array of mean values for each of the parameters in the axis
list.
"""
if axis not in VARIABLE_LIST :
raise ValueError( 'Could not find axis with label: '+str(axis) )
rownum = VARIABLE_ENUMERATION[axis]
return self._mean_vector[rownum] / self._num_particles
# for num, test in enumerate( VARIABLE_LIST ) :
# if axis == test :
# return self._mean_vector[num] / self._num_particles
class CorrectionMatrix :
"""
Performs a running calculation of the correct matrix to the covariance
matrix using in the emittance calculations.
Using a subset of the 6D phase space (i.e. x,px,y,py) makes it faster!
Initialise and then throw pairs of recon & MC hits at the class and it
saves the information in an efficient way.
Uses the assumption that a measured hit has paramerters m[i], that are
related to the physical parameters by:
m_i = u_i + d_i
where d is some error. Then the covariance matrix, V is an approximation,
with corrections R & C such that:
V_{true} = V_{meas} - R^T - R - C
"""
def __init__( self ) :
"""
Initialise the class. Sets counters and values all to zero.
Records the full 6D phase space. Users can request a subset for most
operations.
Designed to have a low memory footprint and high speed. Rather than
toring every hit, running totals and sums are used to generate the
matrices when required.
"""
self._counter = 0
self._length = len( VARIABLE_LIST )
self._R_products = numpy.zeros( ( self._length, self._length ) )
self._C_products = numpy.zeros( ( self._length, self._length ) )
self._V_products = numpy.zeros( ( self._length, self._length ) )
self._d_vector = numpy.zeros( ( self._length ) )
self._u_vector = numpy.zeros( ( self._length ) )
self._m_vector = numpy.zeros( ( self._length ) )
def add_hit( self, recon, MC ) :
"""
Adds a pair of hits to the class. The difference in the properties of
the two hits is used to create the correction matrix.
Any hit-class that as a \"get() \" function is supported
"""
deltas = numpy.zeros( ( self._length ) )
for i in range( self._length ) :
deltas[i] = recon.get( VARIABLE_LIST[i] ) - MC.get( VARIABLE_LIST[i] )
for i in range( self._length ) :
self._d_vector[i] += deltas[i]
self._u_vector[i] += MC.get( VARIABLE_LIST[i] )
for i in range( self._length ) :
for j in range( self._length ) :
self._R_products[i][j] += MC.get( VARIABLE_LIST[i] ) * deltas[j]
self._C_products[i][j] += deltas[i] * deltas[j]
self._counter += 1
def __len__( self ) :
"""
Return the total number of hits that have been processed.
"""
return self._counter
def number_hits( self ) :
"""
Return the total number of hits that have been processed.
"""
return self._counter
def get_R_matrix( self, axes = [ 'x', 'px', 'y', 'py', 'z', 'pz' ] ) :
"""
Calculates the R Correction matrix given by:
R_{ij} = cov( u_i, d_j )
Using the axes supplied. Defaults to the 4x4 Transverse case.
"""
check_axes( axes )
R = numpy.zeros( ( len( axes ), len( axes ) ) )
for covrow, rowvar in enumerate( axes ) :
row = -1
for num, test in enumerate( VARIABLE_LIST ) :
if rowvar == test :
row = num
for covcol, colvar in enumerate( axes ) :
col = -1
for num, test in enumerate( VARIABLE_LIST ) :
if colvar == test :
col = num
R[covrow][covcol] = \
( self._R_products[row][col] / float( self._counter ) ) - \
( ( self._u_vector[row] * self._d_vector[col] ) / \
float( self._counter ** 2 ) )
return R
def get_C_matrix( self, axes = [ 'x', 'px', 'y', 'py', 'z', 'pz' ] ) :
"""
Calculates the C Correction matrix given by:
C_{ij} = cov( d_i, d_j )
Using the axes supplied. Defaults to the 4x4 transverse case.
"""
check_axes( axes )
C = numpy.zeros( ( len( axes ), len( axes ) ) )
for covrow, rowvar in enumerate( axes ) :
row = -1
for num, test in enumerate( VARIABLE_LIST ) :
if rowvar == test :
row = num
for covcol, colvar in enumerate( axes ) :
col = -1
for num, test in enumerate( VARIABLE_LIST ) :
if colvar == test :
col = num
C[covrow][covcol] = ( self._C_products[row][col] / \
float( self._counter ) ) -\
( ( self._d_vector[row] * self._d_vector[col] ) / \
float( self._counter ** 2 ) )
return C
def get_full_correction( self, axes = [ 'x', 'px', 'y', 'py' ] ) :
"""
Returns the full correction matrix to be added to the covariance matrix
"""
R = self.get_R_matrix( axes )
C = self.get_C_matrix( axes )
Rt = numpy.matrix.transpose( R )
new_matrix = -R - Rt - C
return new_matrix
def correct_covariance( self, cov_mat, axes = [ 'x', 'px', 'y', 'py' ] ) :
"""
Calculate and apply the three correction matrices to the covariance
matrix.
"""
correction = self.get_full_correction( axes )
new_cov = cov_mat + correction
return new_cov
def clear_all( self ) :
"""
Resets all the data to zeros. As if nothing ever happened!
"""
self._R_products = numpy.zeros( ( self._length, self._length ) )
self._C_products = numpy.zeros( ( self._length, self._length ) )
self._d_vector = numpy.zeros( ( self._length ) )
self._u_vector = numpy.zeros( ( self._length ) )
self._counter = 0
def save_R_matrix( self, filename ) :
"""
Saves the R Matrix to a File to be used or examined later
"""
numpy.savetxt( filename, self.get_R_matrix() )
def save_C_matrix( self, filename ) :
"""
Saves the C Matrix to a File to be used or examined later
"""
numpy.savetxt( filename, self.get_C_matrix() )
def save_full_correction( self, filename ) :
"""
Saves the full correction matrix to a file to be used or examined later
"""
numpy.savetxt( filename, self.get_full_correction() )