#!/cvmfs/neugrid.egi.eu/software/x86_64/c-pack-d788610/linuxbrew/Cellar/python/2.7.11/bin/python # emacs: -*- mode: python-mode; py-indent-offset: 4; indent-tabs-mode: nil -*- # vi: set ft=python sts=4 ts=4 sw=4 et: ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ## # # See COPYING file distributed along with the NiBabel package for the # copyright and license terms. # ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ## # Copyright (C) 2011 Christian Haselgrove import sys import os import stat import errno import time import locale import logging import fuse import nibabel as nib import nibabel.dft as dft from optparse import OptionParser, Option uid = os.getuid() gid = os.getgid() encoding = locale.getdefaultlocale()[1] fuse.fuse_python_api = (0, 2) logger = logging.getLogger('nibabel.dft') class FileHandle: def __init__(self, fno): self.fno = fno self.keep_cache = False self.direct_io = False return def __str__(self): return 'FileHandle(%d)' % self.fno class DICOMFS(fuse.Fuse): def __init__(self, *args, **kwargs): self.followlinks = kwargs.pop('followlinks', False) fuse.Fuse.__init__(self, *args, **kwargs) self.fhs = {} return def get_paths(self): paths = {} for study in dft.get_studies(self.dicom_path, self.followlinks): pd = paths.setdefault(study.patient_name_or_uid(), {}) patient_info = 'patient information\n' patient_info = 'name: %s\n' % study.patient_name patient_info += 'ID: %s\n' % study.patient_id patient_info += 'birth date: %s\n' % study.patient_birth_date patient_info += 'sex: %s\n' % study.patient_sex pd['INFO'] = patient_info.encode('ascii', 'replace') study_datetime = '%s_%s' % (study.date, study.time) study_info = 'study info\n' study_info += 'UID: %s\n' % study.uid study_info += 'date: %s\n' % study.date study_info += 'time: %s\n' % study.time study_info += 'comments: %s\n' % study.comments d = {'INFO': study_info.encode('ascii', 'replace')} for series in study.series: series_info = 'series info\n' series_info += 'UID: %s\n' % series.uid series_info += 'number: %s\n' % series.number series_info += 'description: %s\n' % series.description series_info += 'rows: %d\n' % series.rows series_info += 'columns: %d\n' % series.columns series_info += 'bits allocated: %d\n' % series.bits_allocated series_info += 'bits stored: %d\n' % series.bits_stored series_info += 'storage instances: %d\n' % len(series.storage_instances) d[series.number] = {'INFO': series_info.encode('ascii', 'replace'), '%s.nii' % series.number: (series.nifti_size, series.as_nifti), '%s.png' % series.number: (series.png_size, series.as_png)} pd[study_datetime] = d return paths def match_path(self, path): wd = self.get_paths() if path == '/': logger.debug('return root') return wd for part in path.lstrip('/').split('/'): logger.debug("path:%s part:%s" % (path, part)) if part not in wd: return None wd = wd[part] logger.debug('return') return wd def readdir(self, path, fh): logger.info('readdir %s' % (path,)) matched_path = self.match_path(path) if matched_path is None: return -errno.ENOENT logger.debug('matched %s' % (matched_path,)) fnames = [ k.encode('ascii', 'replace') for k in matched_path.keys() ] fnames.append('.') fnames.append('..') return [ fuse.Direntry(f) for f in fnames ] def getattr(self, path): logger.debug('getattr %s' % path) matched_path = self.match_path(path) logger.debug('matched: %s' % (matched_path,)) now = time.time() st = fuse.Stat() if isinstance(matched_path, dict): st.st_mode = stat.S_IFDIR | 0755 st.st_ctime = now st.st_mtime = now st.st_atime = now st.st_uid = uid st.st_gid = gid st.st_nlink = len(matched_path) return st if isinstance(matched_path, str): st.st_mode = stat.S_IFREG | 0644 st.st_ctime = now st.st_mtime = now st.st_atime = now st.st_uid = uid st.st_gid = gid st.st_size = len(matched_path) st.st_nlink = 1 return st if isinstance(matched_path, tuple): st.st_mode = stat.S_IFREG | 0644 st.st_ctime = now st.st_mtime = now st.st_atime = now st.st_uid = uid st.st_gid = gid st.st_size = matched_path[0]() st.st_nlink = 1 return st return -errno.ENOENT def open(self, path, flags): logger.debug('open %s' % (path,)) matched_path = self.match_path(path) if matched_path is None: return -errno.ENOENT for i in range(1, 10): if i not in self.fhs: if isinstance(matched_path, str): self.fhs[i] = matched_path elif isinstance(matched_path, tuple): self.fhs[i] = matched_path[1]() else: raise -errno.EFTYPE return FileHandle(i) raise -errno.ENFILE # not done def read(self, path, size, offset, fh): logger.debug('read') logger.debug(path) logger.debug(size) logger.debug(offset) logger.debug(fh) return self.fhs[fh.fno][offset:offset+size] def release(self, path, flags, fh): logger.debug('release') logger.debug(path) logger.debug(fh) del self.fhs[fh.fno] return def get_opt_parser(): # use module docstring for help output p = OptionParser( usage="%s [OPTIONS] " % os.path.basename(sys.argv[0]), version="%prog " + nib.__version__) p.add_options([ Option("-v", "--verbose", action="count", dest="verbose", default=0, help="make noise. Could be specified multiple times"), ]) p.add_options([ Option("-L", "--follow-links", action="store_true", dest="followlinks", default=False, help="Follow symbolic links in DICOM directory"), ]) return p if __name__ == '__main__': parser = get_opt_parser() (opts, files) = parser.parse_args() if opts.verbose: logger.addHandler(logging.StreamHandler(sys.stdout)) logger.setLevel(opts.verbose > 1 and logging.DEBUG or logging.INFO) if len(files) != 2: sys.stderr.write("Please provide two arguments:\n%s\n" % parser.usage) sys.exit(1) fs = DICOMFS(dash_s_do='setsingle', followlinks=opts.followlinks) fs.parse(['-f', '-s', files[1]]) fs.dicom_path = files[0].decode(encoding) try: fs.main() except fuse.FuseError: # fuse prints the error message sys.exit(1) sys.exit(0) # eof