# filereader.py """Read a dicom media file""" # Copyright (c) 2008-2012 Darcy Mason # This file is part of pydicom, released under a modified MIT license. # See the file license.txt included with this distribution, also # available at http://pydicom.googlecode.com from __future__ import absolute_import # Need zlib and io.BytesIO for deflate-compressed file import os.path import warnings import zlib from io import BytesIO import logging from dicom.tag import TupleTag from dicom.dataelem import RawDataElement from dicom.util.hexutil import bytes2hex from dicom.valuerep import extra_length_VRs from dicom.charset import default_encoding, convert_encodings from dicom import in_py3 logger = logging.getLogger('pydicom') stat_available = True try: from os import stat except: stat_available = False from dicom.errors import InvalidDicomError import dicom.UID # for Implicit/Explicit/Little/Big Endian transfer syntax UIDs from dicom.filebase import DicomFile from dicom.dataset import Dataset, FileDataset from dicom.dicomdir import DicomDir from dicom.datadict import dictionaryVR from dicom.dataelem import DataElement from dicom.tag import ItemTag, SequenceDelimiterTag from dicom.sequence import Sequence from dicom.fileutil import read_undefined_length_value from struct import Struct, unpack from sys import byteorder sys_is_little_endian = (byteorder == 'little') class DicomIter(object): """Iterator over DICOM data elements created from a file-like object """ def __init__(self, fp, stop_when=None, force=False): """Read the preamble and meta info, prepare iterator for remainder fp -- an open DicomFileLike object, at start of file Adds flags to fp: Big/Little-endian and Implicit/Explicit VR """ self.fp = fp self.stop_when = stop_when self.preamble = preamble = read_preamble(fp, force) self.has_header = has_header = (preamble is not None) self.file_meta_info = Dataset() if has_header: self.file_meta_info = file_meta_info = _read_file_meta_info(fp) transfer_syntax = file_meta_info.TransferSyntaxUID if transfer_syntax == dicom.UID.ExplicitVRLittleEndian: self._is_implicit_VR = False self._is_little_endian = True elif transfer_syntax == dicom.UID.ImplicitVRLittleEndian: self._is_implicit_VR = True self._is_little_endian = True elif transfer_syntax == dicom.UID.ExplicitVRBigEndian: self._is_implicit_VR = False self._is_little_endian = False elif transfer_syntax == dicom.UID.DeflatedExplicitVRLittleEndian: # See PS3.6-2008 A.5 (p 71) -- when written, the entire dataset # following the file metadata was prepared the normal way, # then "deflate" compression applied. # All that is needed here is to decompress and then # use as normal in a file-like object zipped = fp.read() # -MAX_WBITS part is from comp.lang.python answer: # groups.google.com/group/comp.lang.python/msg/e95b3b38a71e6799 unzipped = zlib.decompress(zipped, -zlib.MAX_WBITS) fp = BytesIO(unzipped) # a file-like object self.fp = fp # point to new object self._is_implicit_VR = False self._is_little_endian = True else: # Any other syntax should be Explicit VR Little Endian, # e.g. all Encapsulated (JPEG etc) are ExplVR-LE # by Standard PS 3.5-2008 A.4 (p63) self._is_implicit_VR = False self._is_little_endian = True else: # no header -- make assumptions fp.TransferSyntaxUID = dicom.UID.ImplicitVRLittleEndian self._is_little_endian = True self._is_implicit_VR = True impl_expl = ("Explicit", "Implicit")[self._is_implicit_VR] big_little = ("Big", "Little")[self._is_little_endian] logger.debug("Using {0:s} VR, {1:s} Endian transfer syntax".format( impl_expl, big_little)) def __iter__(self): tags = sorted(self.file_meta_info.keys()) for tag in tags: yield self.file_meta_info[tag] for data_element in data_element_generator(self.fp, self._is_implicit_VR, self._is_little_endian, stop_when=self.stop_when): yield data_element def data_element_generator(fp, is_implicit_VR, is_little_endian, stop_when=None, defer_size=None, encoding=default_encoding): """Create a generator to efficiently return the raw data elements Returns (VR, length, raw_bytes, value_tell, is_little_endian), where: VR -- None if implicit VR, otherwise the VR read from the file length -- the length as in the DICOM data element (could be DICOM "undefined length" 0xffffffffL), value_bytes -- the raw bytes from the DICOM file (not parsed into python types) is_little_endian -- True if transfer syntax is little endian; else False """ # Summary of DICOM standard PS3.5-2008 chapter 7: # If Implicit VR, data element is: # tag, 4-byte length, value. # The 4-byte length can be FFFFFFFF (undefined length)* # If Explicit VR: # if OB, OW, OF, SQ, UN, or UT: # tag, VR, 2-bytes reserved (both zero), 4-byte length, value # For all but UT, the length can be FFFFFFFF (undefined length)* # else: (any other VR) # tag, VR, (2 byte length), value # * for undefined length, a Sequence Delimitation Item marks the end # of the Value Field. # Note, except for the special_VRs, both impl and expl VR use 8 bytes; # the special VRs follow the 8 bytes with a 4-byte length # With a generator, state is stored, so we can break down # into the individual cases, and not have to check them again for each # data element if is_little_endian: endian_chr = "<" else: endian_chr = ">" if is_implicit_VR: element_struct = Struct(endian_chr + "HHL") else: # Explicit VR # tag, VR, 2-byte length (or 0 if special VRs) element_struct = Struct(endian_chr + "HH2sH") extra_length_struct = Struct(endian_chr + "L") # for special VRs extra_length_unpack = extra_length_struct.unpack # for lookup speed # Make local variables so have faster lookup fp_read = fp.read fp_tell = fp.tell logger_debug = logger.debug debugging = dicom.debugging element_struct_unpack = element_struct.unpack while True: # Read tag, VR, length, get ready to read value bytes_read = fp_read(8) if len(bytes_read) < 8: raise StopIteration # at end of file if debugging: debug_msg = "{0:08x}: {1}".format(fp.tell() - 8, bytes2hex(bytes_read)) if is_implicit_VR: # must reset VR each time; could have set last iteration (e.g. SQ) VR = None group, elem, length = element_struct_unpack(bytes_read) else: # explicit VR group, elem, VR, length = element_struct_unpack(bytes_read) if in_py3: VR = VR.decode(default_encoding) if VR in extra_length_VRs: bytes_read = fp_read(4) length = extra_length_unpack(bytes_read)[0] if debugging: debug_msg += " " + bytes2hex(bytes_read) if debugging: debug_msg = "%-47s (%04x, %04x)" % (debug_msg, group, elem) if not is_implicit_VR: debug_msg += " %s " % VR if length != 0xFFFFFFFFL: debug_msg += "Length: %d" % length else: debug_msg += "Length: Undefined length (FFFFFFFF)" logger_debug(debug_msg) # Positioned to read the value, but may not want to -- check stop_when value_tell = fp_tell() tag = TupleTag((group, elem)) if stop_when is not None: # XXX VR may be None here!! Should stop_when just take tag? if stop_when(tag, VR, length): if debugging: logger_debug("Reading ended by stop_when callback. " "Rewinding to start of data element.") rewind_length = 8 if not is_implicit_VR and VR in extra_length_VRs: rewind_length += 4 fp.seek(value_tell - rewind_length) raise StopIteration # Reading the value # First case (most common): reading a value with a defined length if length != 0xFFFFFFFFL: if defer_size is not None and length > defer_size: # Flag as deferred by setting value to None, and skip bytes value = None logger_debug("Defer size exceeded. " "Skipping forward to next data element.") fp.seek(fp_tell() + length) else: value = fp_read(length) if debugging: dotdot = " " if length > 12: dotdot = "..." logger_debug("%08x: %-34s %s %r %s" % (value_tell, bytes2hex(value[:12]), dotdot, value[:12], dotdot)) # If the tag is (0008,0005) Specific Character Set, then store it if tag == (0x08, 0x05): from dicom.values import convert_string encoding = convert_string(value, is_little_endian, encoding=default_encoding) # Store the encoding value in the generator for use with future elements (SQs) encoding = convert_encodings(encoding) yield RawDataElement(tag, VR, length, value, value_tell, is_implicit_VR, is_little_endian) # Second case: undefined length - must seek to delimiter, # unless is SQ type, in which case is easier to parse it, because # undefined length SQs and items of undefined lengths can be nested # and it would be error-prone to read to the correct outer delimiter else: # Try to look up type to see if is a SQ # if private tag, won't be able to look it up in dictionary, # in which case just ignore it and read the bytes unless it is # identified as a Sequence if VR is None: try: VR = dictionaryVR(tag) except KeyError: # Look ahead to see if it consists of items and is thus a SQ next_tag = TupleTag(unpack(endian_chr + "HH", fp_read(4))) # Rewind the file fp.seek(fp_tell() - 4) if next_tag == ItemTag: VR = 'SQ' if VR == 'SQ': if debugging: msg = "{0:08x}: Reading/parsing undefined length sequence" logger_debug(msg.format(fp_tell())) seq = read_sequence(fp, is_implicit_VR, is_little_endian, length, encoding) yield DataElement(tag, VR, seq, value_tell, is_undefined_length=True) else: delimiter = SequenceDelimiterTag if debugging: logger_debug("Reading undefined length data element") value = read_undefined_length_value(fp, is_little_endian, delimiter, defer_size) # If the tag is (0008,0005) Specific Character Set, then store it if tag == (0x08, 0x05): from dicom.values import convert_string encoding = convert_string(value, is_little_endian, encoding=default_encoding) # Store the encoding value in the generator for use with future elements (SQs) encoding = convert_encodings(encoding) yield RawDataElement(tag, VR, length, value, value_tell, is_implicit_VR, is_little_endian) def read_dataset(fp, is_implicit_VR, is_little_endian, bytelength=None, stop_when=None, defer_size=None, parent_encoding=default_encoding): """Return a Dataset instance containing the next dataset in the file. :param fp: an opened file object :param is_implicit_VR: True if file transfer syntax is implicit VR :param is_little_endian: True if file has little endian transfer syntax :param bytelength: None to read until end of file or ItemDeliterTag, else a fixed number of bytes to read :param stop_when: optional call_back function which can terminate reading. See help for data_element_generator for details :param defer_size: optional size to avoid loading large elements in memory. See help for data_element_generator for details :param parent_encoding: optional encoding to use as a default in case a Specific Character Set (0008,0005) isn't specified :returns: a Dataset instance """ raw_data_elements = dict() fpStart = fp.tell() de_gen = data_element_generator(fp, is_implicit_VR, is_little_endian, stop_when, defer_size, parent_encoding) try: while (bytelength is None) or (fp.tell() - fpStart < bytelength): raw_data_element = next(de_gen) # Read data elements. Stop on some errors, but return what was read tag = raw_data_element.tag # Check for ItemDelimiterTag --dataset is an item in a sequence if tag == (0xFFFE, 0xE00D): break raw_data_elements[tag] = raw_data_element except StopIteration: pass except EOFError as details: # XXX is this error visible enough to user code with just logging? logger.error(str(details) + " in file " + getattr(fp, "name", "")) except NotImplementedError as details: logger.error(details) return Dataset(raw_data_elements) def read_sequence(fp, is_implicit_VR, is_little_endian, bytelength, encoding, offset=0): """Read and return a Sequence -- i.e. a list of Datasets""" seq = [] # use builtin list to start for speed, convert to Sequence at end is_undefined_length = False if bytelength != 0: # SQ of length 0 possible (PS 3.5-2008 7.5.1a (p.40) if bytelength == 0xffffffffL: is_undefined_length = True bytelength = None fp_tell = fp.tell # for speed in loop fpStart = fp_tell() while (not bytelength) or (fp_tell() - fpStart < bytelength): file_tell = fp.tell() dataset = read_sequence_item(fp, is_implicit_VR, is_little_endian, encoding, offset) if dataset is None: # None is returned if hit Sequence Delimiter break dataset.file_tell = file_tell + offset seq.append(dataset) seq = Sequence(seq) seq.is_undefined_length = is_undefined_length return seq def read_sequence_item(fp, is_implicit_VR, is_little_endian, encoding, offset=0): """Read and return a single sequence item, i.e. a Dataset""" seq_item_tell = fp.tell() + offset if is_little_endian: tag_length_format = "