# -*- coding: utf-8 -*-
# This software and supporting documentation are distributed by
# Institut Federatif de Recherche 49
# CEA/NeuroSpin, Batiment 145,
# 91191 Gif-sur-Yvette cedex
# France
#
# This software is governed by the CeCILL license version 2 under
# French law and abiding by the rules of distribution of free software.
# You can use, modify and/or redistribute the software under the
# terms of the CeCILL license version 2 as circulated by CEA, CNRS
# and INRIA at the following URL "http://www.cecill.info".
#
# As a counterpart to the access to the source code and rights to copy,
# modify and redistribute granted by the license, users are provided only
# with a limited warranty and the software's author, the holder of the
# economic rights, and the successive licensors have only limited
# liability.
#
# In this respect, the user's attention is drawn to the risks associated
# with loading, using, modifying and/or developing or reproducing the
# software by the user in light of its specific status of free software,
# that may mean that it is complicated to manipulate, and that also
# therefore means that it is reserved for developers and experienced
# professionals having in-depth computer knowledge. Users are therefore
# encouraged to load and test the software's suitability as regards their
# requirements in conditions enabling the security of their systems and/or
# data to be ensured and, more generally, to use and operate it in the
# same conditions as regards security.
#
# The fact that you are presently reading this means that you have had
# knowledge of the CeCILL license version 2 and that you accept its terms.
from brainvisa.processes import *
import shfjGlobals, math
import registration
from brainvisa import anatomist
from brainvisa import quaternion
name = 'Prepare Subject for Anatomical Pipeline'
userLevel = 0
signature = Signature(
'T1mri', ReadDiskItem( "Raw T1 MRI", shfjGlobals.vipVolumeFormats ),
'commissure_coordinates', WriteDiskItem( 'Commissure coordinates','Commissure coordinates'),
'Normalised',Choice('No','MNI from SPM','MNI from Mritotal', 'Marseille from SPM'),
'Anterior_Commissure', Point3D(),
'Posterior_Commissure', Point3D(),
'Interhemispheric_Point', Point3D(),
'Left_Hemisphere_Point', Point3D(),
'allow_flip_initial_MRI', Boolean(),
'remove_older_MNI_normalization', Boolean(),
'older_MNI_normalization',
ReadDiskItem( 'Transform Raw T1 MRI to Talairach-MNI template-SPM',
'Transformation matrix' ),
)
def validation():
anatomist.validation()
class APCReader:
def __init__( self, key ):
self._key = key
def __call__( self, values, process ):
acp = None
if values.commissure_coordinates is not None:
acp = values.commissure_coordinates
#elif values.T1mri:
# acp = ReadDiskItem( 'Commissure coordinates','Commissure coordinates')\
# .findValue( values.T1mri )
result = None
key_mm = self._key + 'mm'
if acp is not None and acp.isReadable():
f = open( acp.fullPath() )
for l in f.readlines():
l = l.split( ':', 1 )
if len(l) == 2 and l[0] == key_mm:
return [ float(i) for i in l[1].split() ]
if len(l) == 2 and l[0] == self._key and values.T1mri is not None:
vs = values.T1mri.get( 'voxel_size' )
if vs:
pos = l[1].split()
if len( pos ) == 3:
result = [ float(i) * j for i,j in zip( pos, vs ) ]
return result
def initialization( self ):
def linknorm( values, process ):
if values.T1mri and values.T1mri.get( 'normalized' ) == 'yes':
return 'MNI from SPM'
return 'No'
self.linkParameters( 'commissure_coordinates', 'T1mri' )
self.Normalised = 'No'
self.setOptional( 'Anterior_Commissure' )
self.setOptional( 'Posterior_Commissure' )
self.setOptional( 'Interhemispheric_Point' )
self.signature[ 'Anterior_Commissure' ].add3DLink( self, 'T1mri' )
self.signature[ 'Posterior_Commissure' ].add3DLink( self, 'T1mri' )
self.signature[ 'Interhemispheric_Point' ].add3DLink( self, 'T1mri' )
self.signature[ 'Left_Hemisphere_Point' ].add3DLink( self, 'T1mri' )
self.linkParameters( 'Anterior_Commissure',
'commissure_coordinates', APCReader( 'AC' ) )
self.linkParameters( 'Posterior_Commissure',
'commissure_coordinates', APCReader( 'PC' ) )
self.linkParameters( 'Interhemispheric_Point',
'commissure_coordinates', APCReader( 'IH' ) )
self.setOptional( 'Left_Hemisphere_Point' )
self.allow_flip_initial_MRI = 0
self.linkParameters( 'Normalised', 'T1mri', linknorm )
self.setOptional( 'older_MNI_normalization' )
self.linkParameters( 'older_MNI_normalization', 'T1mri' )
def execution( self, context ):
ac = []
pc = []
ip = []
lh = []
acmm = self.Anterior_Commissure
pcmm = self.Posterior_Commissure
ipmm = self.Interhemispheric_Point
if self.Normalised == 'No':
atts = shfjGlobals.aimsVolumeAttributes( self.T1mri )
vs = atts[ 'voxel_size' ]
#context.write( 'voxel size: ', vs )
ac = self.Anterior_Commissure
pc = self.Posterior_Commissure
ip = self.Interhemispheric_Point
lh = self.Left_Hemisphere_Point
if not ac or len( ac ) != 3 or not pc or len( pc ) != 3 or not ip or \
len(ip ) != 3:
raise RuntimeError( _t_( 'In non-normalized mode, the 3 points AC, PC '
'and IP are mandatory (in mm)' ) )
def vecproduct( v1, v2 ):
return ( v1[1] * v2[2] - v1[2] * v2[1],
v1[2] * v2[0] - v1[0] * v2[2],
v1[0] * v2[1] - v1[1] * v2[0] )
def dot( v1, v2 ):
return v1[0] * v2[0] + v1[1] * v2[1] + v1[2] * v2[2]
def norm( v ):
return math.sqrt( v[0] * v[0] + v[1] * v[1] + v[2] * v[2] )
def normalize( v ):
nrm = norm(v)
if nrm == 0.:
return v
else:
n = 1. / norm(v)
return ( v[0] * n, v[1] * n, v[2] * n )
def vecscale( v, scl ):
return ( v[0] * scl, v[1] * scl, v[2] * scl )
# determine image orientation
v1 = normalize( ( pc[0] - ac[0], pc[1] - ac[1], pc[2] - ac[2] ) )
ia = ( ac[0] - ip[0], ac[1] - ip[1], ac[2] - ip[2] )
v2u = normalize( ia )
v3 = normalize( vecproduct( v1, v2u ) )
v2 = normalize( vecproduct( v3, v1 ) )
# sanity check
ipdot = dot( v1, ia )
ipdist = norm( ( -ia[0] + ipdot * v1[0], -ia[1] + ipdot * v1[1],
-ia[2] + ipdot * v1[2] ) )
if ipdist == 0:
raise ValueError( _t_( 'AC, PC and IP are aligned, the interhemispheric'\
'plane orientation cannot be determined. Please chose IP upper in ' \
'the brain.') )
if ipdist < 30.:
context.warning( _t_( 'IP is close to the AC-PC axis. This may be ' \
'an error, and in any case leads to a poor precision on the ' \
'determination of the interhemispheric plane. You should better ' \
'chose IP upper in the brain.' ) )
# determine rotation between y axis and v1
y = ( 0, 1, 0 )
n = vecproduct( y, v1 )
cosal = dot( y, v1 )
if norm( n ) < 1e-5:
if cosal > 0:
r1 = quaternion.Quaternion( ( 0, 0, 0, 1 ) )
else:
r1 = quaternion.Quaternion( ( 0, 0, 1, 0 ) ) # flip z
else:
n = normalize( n )
t = vecproduct( y, n )
alpha = math.acos( cosal )
if dot( t, v1 ) < 0:
alpha = -alpha
r1 = quaternion.Quaternion()
r1.fromAxis( n, alpha )
# apply r1 to ( v1, v2, v3 )
v1_1 = r1.transform( v1 )
v2_1 = r1.transform( v2 )
v3_1 = r1.transform( v3 )
# now v1_1 should be aligned on y
# determine rotation between z axis and v2
z = ( 0, 0, 1 )
p = dot( z, v2_1 )
if p > 1.:
p = 1.
elif p < -1.:
p = -1.
alpha = math.acos( p )
q = dot( normalize( vecproduct( z, v2_1 ) ), y )
if q >= 0:
alpha = -alpha
r2 = quaternion.Quaternion()
r2.fromAxis( y, alpha )
# apply r2 to ( v1, v2, v3 )
v1_2 = r2.transform( v1_1 )
v2_2 = r2.transform( v2_1 )
v3_2 = r2.transform( v3_1 )
r3 = r1.compose( r2 )
trans = r3.rotationMatrix()
# check x inversion
if lh is None:
context.warning( _t_( 'Left hemisphere point not specified - X axis ' \
'flip will not be checked' ) )
else:
x = ( 1, 0, 0 )
lvec = r3.transform( ( lh[0] - ac[0], lh[1] - ac[1], lh[2] - ac[2] ) )
if dot( x, lvec ) < 0:
context.write( _t_( 'X is flipped' ) )
trans[0] *= -1
trans[1] *= -1
trans[2] *= -1
trans[3] *= -1
#context.write( 'trans:', trans )
# build binary flip matrix
flipmat = [ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1 ]
dims = atts[ 'volume_dimension' ][:3]
dims2 = ( dims[0] * vs[0], dims[1] * vs[1], dims[2] * vs[2] )
imax = 0
for i in range(1,3):
if abs( trans[i] ) > abs( trans[imax] ):
imax = i
if trans[imax] >= 0:
flipmat[imax] = 1
else:
flipmat[imax] = -1
flipmat[3] = dims2[imax]
imax = 4
for i in range(5,7):
if abs( trans[i] ) > abs( trans[imax] ):
imax = i
if trans[imax] >= 0:
flipmat[imax] = 1
else:
flipmat[imax] = -1
flipmat[7] = dims2[imax%4]
imax = 8
for i in range(9,11):
if abs( trans[i] ) > abs( trans[imax] ):
imax = i
if trans[imax] >= 0:
flipmat[imax] = 1
else:
flipmat[imax] = -1
flipmat[11] = dims2[imax%4]
#context.write( 'flip matrix:', flipmat )
needsflip = False
if flipmat != [ 1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1 ]:
context.warning( _t_( 'Flip needed, with matrix:' ) )
context.write( '[', flipmat[0:4] )
context.write( ' ', flipmat[4:8] )
context.write( ' ', flipmat[8:12] )
context.write( ' ', flipmat[12:16], ']' )
needsflip = True
else:
context.write( 'OK, the image seems to be in the correct orientation.' )
if needsflip and not self.allow_flip_initial_MRI:
context.write( 'Image needs to be flipped, but you did not ' \
'allow it, so it won\'t change. Expect problems in ' \
'hemispheres separation, and sulci recognition will ' \
'not work anyway.' )
if needsflip and self.allow_flip_initial_MRI:
def matrixMult( m, p ):
return [ m[0] * p[0] + m[1] * p[1] + m[2] * p[2] + m[3],
m[4] * p[0] + m[5] * p[1] + m[6] * p[2] + m[7],
m[8] * p[0] + m[9] * p[1] + m[10] * p[2] + m[11] ]
fliprot = flipmat[:3] + [ 0 ] + flipmat[4:7] + [0] + flipmat[8:11] \
+ [0, 0, 0, 0, 1 ]
vs2 = map( abs, matrixMult( fliprot, vs ) )
dims = atts[ 'volume_dimension' ][:3]
dims2 = ( dims[0] * vs[0], dims[1] * vs[1], dims[2] * vs[2] )
dims3 = matrixMult( fliprot, dims2 )
dims4 = map( lambda x,y: int( round( abs( x ) / y ) ), dims3, vs2 )
#flip[9] = -min( 0, dims3[0] )
#flip[10] = -min( 0, dims3[1] )
#flip[11] = -min( 0, dims3[2] )
context.write( 'WARNING: Flipping and ' \
're-writing source image' )
context.write( 'voxel size orig :', vs )
context.write( 'voxel size final:', vs2 )
context.write( 'dims orig :', dims )
context.write( 'dims final:', dims4 )
context.write( 'transformation:', flipmat )
mfile = context.temporary( 'Transformation matrix' )
mf = open( mfile.fullPath(), 'w' )
mf.write( string.join( map( str, flipmat[3:12:4] ) ) + '\n' )
mf.write( string.join( map( str, flipmat[:3] ) ) + '\n' )
mf.write( string.join( map( str, flipmat[4:7] ) ) + '\n' )
mf.write( string.join( map( str, flipmat[8:11] ) ) + '\n' )
mf.close()
context.log( 'Transformation',
html = 'transformation: R = ' + str( flipmat[:3] \
+ flipmat[4:7] + flipmat[8:11] ) \
+ ', T = ' + str( flipmat[3:12:4] ) )
context.system( 'AimsResample', '-i', self.T1mri.fullPath(),
'-o', self.T1mri.fullPath(), '-m', mfile.fullPath(),
'--sx', vs2[0], '--sy', vs2[1], '--sz', vs2[2],
'--dx', dims4[0], '--dy', dims4[1], '--dz', dims4[2],
'--type', 'nearest' )
acmm = matrixMult( flipmat, ac )
pcmm = matrixMult( flipmat, pc )
ipmm = matrixMult( flipmat, ip )
context.write( 'new AC:', acmm )
context.write( 'new PC:', pcmm )
context.write( 'new IP:', ipmm )
vs = vs2
# reload image in Anatomist
a = anatomist.Anatomist( create=False ) # test if anatomist is started
if a:
object=a.getObject(self.T1mri.fullPath())
if object is not None:
a.reloadObjects([object])
ac = [ int( acmm[0] / vs[0] +0.5 ), int( acmm[1] / vs[1] +0.5),
int( acmm[2] / vs[2] +0.5) ]
pc = [ int( pcmm[0] / vs[0] +0.5 ), int( pcmm[1] / vs[1] +0.5),
int( pcmm[2] / vs[2] +0.5) ]
ip = [ int( ipmm[0] / vs[0] +0.5 ), int( ipmm[1] / vs[1] +0.5),
int( ipmm[2] / vs[2] +0.5) ]
# normalized case
else:
atts = shfjGlobals.aimsVolumeAttributes( self.T1mri )
refs = atts.get( 'referentials' )
trans = atts.get( 'transformations' )
vs = atts[ 'voxel_size' ]
autonorm = False
if refs and trans:
for i in range( len( refs ) ):
if refs[i] == 'Talairach-MNI template-SPM':
break
if i >= len( refs ):
i = 0
tr = trans[0]
try:
import soma.aims as aims
a2t = aims.Motion( tr )
t2a = a2t.inverse()
acmm = t2a.transform( [ 0, 0, 0 ] )
ac = [ int( acmm[0] / vs[0] ), int( acmm[1] / vs[1] ),
int( acmm[2] / vs[2] ) ]
pcmm = t2a.transform( [ 0, -28, 0 ] )
pc = [ int( pcmm[0] / vs[0] ), int( pcmm[1] / vs[1] ),
int( pcmm[2] / vs[2] ) ]
ipmm = t2a.transform( [ 0, -20, 60 ] )
ip = [ int( ipmm[0] / vs[0] ), int( ipmm[1] / vs[1] ),
int( ipmm[2] / vs[2] ) ]
autonorm = True
except Exception, e:
context.warning( e )
if not autonorm:
if self.Normalised == 'MNI from SPM' :
ac = [ 77, 73, 88 ]
pc = [ 77, 100, 83 ]
ip = [ 76, 60, 35 ]
acmm = ac
pcmm = pc
ipmm = ip
#self.T1mri.setMinf( 'referential',
#registration.talairachMNIReferentialId )
#try:
#self.T1mri.saveMinf()
#except:
#context.warning( 'could not set SPM/MNI normalized '
#'referential to', self.T1mri.fullName() )
elif self.Normalised == 'MNI from Mritotal' :
ac = [ 91, 88, 113 ]
pc = [ 91, 115, 109 ]
ip = [ 90, 109, 53 ]
acmm = ac
pcmm = pc
ipmm = ip
elif self.Normalised == 'Marseille from SPM' :
ac = [ 91, 93, 108 ]
pc = [ 91, 118, 106 ]
ip = [ 91, 98, 68 ]
acmm = ac
pcmm = pc
ipmm = ip
f = open( self.commissure_coordinates.fullPath(), 'w' )
f.write( "AC: " + string.join( map( lambda x:str(x), ac ) ) + '\n' )
f.write( "PC: " + string.join( map( lambda x:str(x), pc ) ) + '\n' )
f.write( "IH: " + string.join( map( lambda x:str(x), ip ) ) + '\n' )
f.write( "The previous coordinates, used by the system, are defined in " \
"voxels\n" )
f.write( "They stem from the following coordinates in millimeters:\n" )
if self.Normalised == 'No':
f.write( "ACmm: " + string.join( map( str, acmm ) ) + '\n' )
f.write( "PCmm: " + string.join( map( str, pcmm ) ) + '\n' )
f.write( "IHmm: " + string.join( map( str, ipmm ) ) + '\n' )
else:
f.write( "ACmm: " + string.join( map( str, ac ) ) + '\n' )
f.write( "PCmm: " + string.join( map( str, pc ) ) + '\n' )
f.write( "IHmm: " + string.join( map( str, ip ) ) + '\n' )
f.close()
# remove older MNI normalization
if self.remove_older_MNI_normalization \
and self.older_MNI_normalization is not None:
try:
db = neuroHierarchy.databases.database( \
self.older_MNI_normalization.get("_database") )
except: # running without databasing
db = None
for f in self.older_MNI_normalization.existingFiles():
os.unlink( f )
if db:
db.removeDiskItem( self.older_MNI_normalization )
# manage referential
tm = registration.getTransformationManager()
ref = tm.referential( self.T1mri )
if ref is None:
tm.createNewReferentialFor( self.T1mri )