import numarray.records def normalize_format(fmt): """Normalize format to follow numarray conventions.""" # Remove shape '()' at the forefront which is equivalent to an scalar if fmt[:2] == '()': fmt = fmt[2:] # Accept 'S' as a synonym of 'a' if fmt.find('S') >= 0: fmt = fmt.replace('S', 'a') return fmt def getIter(object): """Return an iterator (if any) for object """ iterator = None try: iterator = iter(object) except TypeError: pass return iterator # # Methods for flatten the buffer structure descriptions # def flattenDescr(descr, check=False): """Flatten a descr description of a buffer. Names of nested fields are returned as a level1/level2/.../levelN path. If ``check`` is True the function returns None when it finds some element with an incorrect format. This is not strictely necessary, but it is useful for testing purposes. """ i = getIter(descr) if not i: return try: item = i.next() while item: if isinstance(item, tuple) and len(item) == 2 and \ (isinstance(item[1], str) or isinstance(item[1], list)) and \ isinstance(item[0], str): if isinstance(item[1], str): yield item else: for c in flattenDescr(item[1], check): if c == None: yield c else: name = '%s/%s' % (item[0], c[0]) yield (name, c[1]) else: if check: yield None item = i.next() except StopIteration: pass def flattenFormats(formats, check=False): """Flatten a formats description of a buffer. If ``check`` is True the function returns None when it finds some element with an incorrect format, i.e. an element that is neither a string nor a sequence. This is not strictely necessary, but it is useful for testing purposes. """ i = getIter(formats) if not i: return try: item = i.next() while item: if isinstance(item, str): yield normalize_format(item) elif isinstance(item, list) or isinstance(item, tuple): for c in flattenFormats(item, check): yield c else: if check: yield None item = i.next() except StopIteration: pass def flattenNames(names, check=False): """Flatten a names description of a buffer. Names of nested fields are returned with its full path, i.e. level1/level2/.../levelN. If ``check`` is True the function returns None when it finds some element with an incorrect format, i.e. an element that is neither a string nor a 2-tuple. This is not strictely necessary, but it is useful for testing purposes. """ i = getIter(names) if not i: return try: item = i.next() while item: if isinstance(item, str): yield item elif isinstance(item, tuple) and len(item) == 2\ and isinstance(item[0], str) and isinstance(item[1], list): for c in flattenNames(item[1], check): if c == None: yield c else: yield '%s/%s' % (item[0], c) else: if check: yield None item = i.next() except StopIteration: pass # # Methods to get a given description list from another one # def getDescr(names, formats): """Create a descr description by mixing formats and names lists. This method assumes that names and formats descriptions structure are good (i.e. that checkNames and checkFormats nesterecords methods raised no errors). """ if not names: names = [item for item in makeNamesFromFormats(formats)] if type(formats) == str and type(names) == str: yield (names, formats) raise StopIteration if len(formats) != len(names): raise ValueError("""The formats and names structure don't match!""") mix = zip(names, formats) i = getIter(mix) if not i: return try: (name, fmt) = i.next() while (name, fmt): if isinstance(name, str) and isinstance(fmt, str): yield (name, fmt) else: l = [] for (a, b) in getDescr(name[1], fmt): l.append((a,b)) yield (name[0], l) (name, fmt) = i.next() except StopIteration: pass def makeNamesFromFormats(formats): """Create a names description from a formats one. Field names are generated automatically as c1, c2 and so on. This method assumes that formats description structure is good (i.e. that nestedrecords.checkFormats method raised no errors). """ i = getIter(formats) if not i: return try: c = 0 item = i.next() while item: c = c +1 name = 'c%s' % c if isinstance(item, str): yield name else: l = [] for a in makeNamesFromFormats(item): l.append(a) yield (name, l) item = i.next() except StopIteration: pass def getNamesFromDescr(descr): """Extract field names from a description sequence. Given a descr sequence, this method retrieves the embeded names list. This method assumes that descr description structure is good (i.e. that nestedrecords.checkDescr method raised no errors). """ i = getIter(descr) if not i: return try: item = i.next() while item: if isinstance(item[1], str): yield item[0] else: l = [] for j in getNamesFromDescr(item[1]): l.append(j) r = (item[0], l) yield r item = i.next() except StopIteration: pass def getFormatsFromDescr(descr): """Extract field formats from a description sequence. Given a descr sequence, this method retrieves the embeded formats list. This method assumes that descr description structure is good (i.e. that nestedrecords.checkDescr method raised no errors). """ i = getIter(descr) if not i: return try: item = i.next() while item: item1 = item[1] if isinstance(item1, str): yield normalize_format(item1) else: l = [] for j in getFormatsFromDescr(item1): l.append(j) yield l item = i.next() except StopIteration: pass # Methods to deal with descr description def getFieldDescr(fieldName, descr): """Retrieve the descr list corresponding to a given field. For nested fields the fieldName is passed as x/y... This method assumes that descr description structure is good (i.e. that nestedrecords.checkDescr method raised no errors). """ i = getIter(descr) if not i: return try: sw = '' item = i.next() while item: if fieldName == item[0]: yield item break if isinstance(item[1], list): if fieldName.startswith('%s/' %item[0]): sw = item[0] else: item = i.next() continue [trash, newField] = fieldName.split(sw + '/') for c in getFieldDescr(newField, item[1]): sw = '%s/%s' % (sw, c[0]) yield (sw, c[1]) item = i.next() except StopIteration: pass # # Methods to deal with the names description # def getSubNames(names): """Retrieve the list of all names and sub-names in a names list. For nested fields all sub-field names are returned. For instance, a field x/y/z will be returned as x, y, z. This method is used to check that field names don't contain '/' characters. This method assumes that names description structure is good (i.e. that nestedrecords.checkNames method raised no errors). """ i = getIter(names) if not i: return try: item = i.next() while item: if isinstance(item, str): yield item else: ## elif isinstance(item, tuple) and len(item) == 2: yield item[0] for c in getSubNames(item[1]): yield c item = i.next() except StopIteration: pass def checkNamesUniqueness(names): """At every level of the names description check that names are unique. This method assumes that names description structure is good (i.e. that nestedrecords.checkNames method raised no errors.). This is a recursive function but it doesn't use generators. """ topNames, deeperNames = getLevelNames(names) ## print topNames ## print deeperNames for name in topNames[:-1]: if topNames.count(name) > 1: raise ValueError("""\Names at every level must be unique!""") if deeperNames: checkNamesUniqueness(deeperNames) def getLevelNames(names): """Retrieve the list of names in a given level. This method assumes that names description structure is good (i.e. that nestedrecords.checkNames method raised no errors.). """ topNames = [] deeperNames = [] for item in names: if isinstance(item, str): topNames.append(item) else: topNames.append(item[0]) # Names immediately under the current level must be # qualified with the current level full name for j in item[1]: if isinstance(j, str): subname = '%s/%s' % (item[0], j) else: # j is a 2-tuple jlist = list(j) jlist[0] = '%s/%s' % (item[0], jlist[0]) subname = tuple(jlist) deeperNames.append( subname) return topNames, deeperNames # # Methods to check the correctness of the buffer structure # def zipBufferDescr(row, structure): """Zip a buffer row with its `descr` description. This function is used to check if buffers have a consistent format. This is done by applying the function on every row of the buffer. The function zips the buffer row with the buffer descr description in a recursive way, till a flat list of 2-tuples is obtained. Each 2-tuple contains the value of a field and its description. Recursion is needed to deal with nested fields of the buffer. This method assumes that descr description structure is good (i.e. that nestedrecords.checkDescr method raised no errors.). """ ## print 'row **', row.__class__, row ## print 'structure **', structure if len(row) != len(structure): raise ValueError("""The row structure doesn't match that provided"""\ """ by the format specification""") mix = zip(row, structure) i = getIter(mix) if not i: return try: (value, descr) = i.next() while (value, descr): fmt = descr[1] if isinstance(fmt, str): yield (value, fmt) else: for (a, b) in zipBufferDescr(value, descr[1]): yield (a, b) (value, descr) = i.next() except StopIteration: pass def flattenArraysList(array_list, descr, flat_array_list): """Flatten a buffer made of arrays list. """ if isinstance(array_list, numarray.records.RecArray): raise TypeError("""``arrayList`` cannot be a recarray""") if isinstance(descr, tuple) and len(descr) == 2 and \ isinstance(descr[1], str) and isinstance(descr[0], str): flat_array_list.append(array_list) elif isinstance(descr, tuple) and len(descr) == 2 and \ isinstance(descr[1], list) and isinstance(descr[0], str): for sdpos, sdescr in enumerate(descr[1]): flattenArraysList(array_list[sdpos], sdescr, flat_array_list) elif isinstance(descr, list): for sdpos, sdescr in enumerate(descr): flattenArraysList(array_list[sdpos], sdescr, flat_array_list)