# A parser for the iRODS XML-like protocol. # The interface aims to be compatible with xml.etree.ElementTree, # at least for the features used by python-irodsclient. class Element(): """ Represents body. (Where `body' is either a string or a list of sub-elements.) """ @property def tag(self): return self.name def __init__(self, name, body): """Initialize with the tag's name and the body (i.e. content).""" if body == []: # Empty element. self.text = None elif type(body) is not list: # String element: decode body. body = decode_entities(body) self.text = body self.name = name self.body = body def find(self, name): """Get first matching child element by name.""" for x in self.findall(name): return x def findall(self, name): """Get matching child elements by name.""" return list(self.findall_(name)) def findall_(self, name): """Get matching child elements by name (generator variant).""" return (el for el in self.body if el.name == name) # For debugging convenience: def __str__(self): if type(self.body) is list: return '<{}>{}'.format(self.name, ''.join(map(str, self.body)), self.name) else: return '<{}>{}'.format(self.name, encode_entities(self.body), self.name) def __repr__(self): return '{}({})'.format(self.name, repr(self.body)) class Token(object): """A utility class for parsing XML.""" def __init__(self, s): """Create a `Token' object from `s', the text comprising the parsed token.""" self.text = s def __repr__(self): return str(type(self).__name__) + '(' + self.text.decode('utf-8') + ')' def __str__(self): return repr(self) class TokenTagOpen(Token): """An opening tag ()""" class TokenTagClose(Token): """An closing tag ()""" class TokenCData(Token): """Textual element body""" class QuasiXmlParseError(Exception): """Indicates parse failure of XML protocol data.""" def tokenize(s): """Parse an XML-ish string into a list of tokens.""" tokens = [] # Consume input until empty. while True: nextclose = s.find(b'', 1) except Exception: raise QuasiXmlParseError('protocol error: unterminated close tag') tokens.append(TokenTagClose(name)) s = s.lstrip() # consume space after closing tag # Opening tag? elif s.startswith(b'<'): try: name, s = s[1:].split(b'>', 1) except Exception: raise QuasiXmlParseError('protocol error: unterminated open tag') tokens.append(TokenTagOpen(name)) else: # capture cdata till next tag. try: cdata, s = s.split(b'<', 1) except Exception: raise QuasiXmlParseError('protocol error: unterminated cdata') s = b'<' + s tokens.append(TokenCData(cdata)) def fromtokens(tokens): """Parse XML-ish tokens into an Element.""" def parse_elem(tokens): """Parse some tokens into one Element, and return unconsumed tokens.""" topen, tokens = tokens[0], tokens[1:] if type(topen) is not TokenTagOpen: raise QuasiXmlParseError('protocol error: data does not start with open tag') children = [] cdata = None while len(tokens) > 0: t, tokens = tokens[0], tokens[1:] if type(t) is TokenTagOpen: # Slurp a sub-element. el, tokens = parse_elem([t] + tokens) children.append(el) # Continue with non-consumed tokens. elif type(t) == TokenTagClose: if t.text != topen.text: raise QuasiXmlParseError('protocol error: close tag <{}> does not match opening tag <{}>'.format(t.text, topen.text)) elif cdata is not None and len(children): raise QuasiXmlParseError('protocol error: mixed cdata and child elements') return Element(topen.text.decode('utf-8'), cdata.decode('utf-8') if cdata is not None else children), tokens else: cdata = t.text elem, rest = parse_elem(tokens) if rest != []: raise QuasiXmlParseError('protocol error: trailing data') return elem try: unicode # Python 2 except NameError: unicode = str def fromstring(s): if type(s) is unicode: s = s.encode('utf-8') if type(s) is not bytes: raise TypeError('expected a bytes-object, got {}'.format(type(s).__name__)) return fromtokens(tokenize(s)) def encode_entities(s): from . import XML_entities_active for k, v in XML_entities_active(): s = s.replace(k, v) return s def decode_entities(s): from . import XML_entities_active rev = list(XML_entities_active()) rev.reverse() # (make sure & is decoded last) for k, v in rev: s = s.replace(v, k) return s