""" Parser plugin interface. This module defines the parser plugin interface and contains other related parser support code. The module is mainly useful for those wanting to write a parser that can plugin to rdflib. If you are wanting to invoke a parser you likely want to do so through the Graph class parse method. """ from __future__ import annotations import codecs import os import pathlib import sys from io import BufferedIOBase, BytesIO, RawIOBase, StringIO, TextIOBase, TextIOWrapper from typing import ( IO, TYPE_CHECKING, Any, BinaryIO, List, Optional, TextIO, Tuple, Union, ) from urllib.parse import urljoin from urllib.request import Request, url2pathname from xml.sax import xmlreader import rdflib.util from rdflib import __version__ from rdflib._networking import _urlopen from rdflib.namespace import Namespace from rdflib.term import URIRef if TYPE_CHECKING: from email.message import Message from urllib.response import addinfourl from rdflib.graph import Graph __all__ = [ "Parser", "InputSource", "StringInputSource", "URLInputSource", "FileInputSource", "PythonInputSource", ] class Parser: __slots__ = () def __init__(self): pass def parse(self, source: "InputSource", sink: "Graph") -> None: pass class BytesIOWrapper(BufferedIOBase): __slots__ = ("wrapped", "encoded", "encoding") def __init__(self, wrapped: str, encoding="utf-8"): super(BytesIOWrapper, self).__init__() self.wrapped = wrapped self.encoding = encoding self.encoded = None def read(self, *args, **kwargs): if self.encoded is None: b, blen = codecs.getencoder(self.encoding)(self.wrapped) self.encoded = BytesIO(b) return self.encoded.read(*args, **kwargs) def read1(self, *args, **kwargs): if self.encoded is None: b = codecs.getencoder(self.encoding)(self.wrapped) self.encoded = BytesIO(b) return self.encoded.read1(*args, **kwargs) def readinto(self, *args, **kwargs): raise NotImplementedError() def readinto1(self, *args, **kwargs): raise NotImplementedError() def write(self, *args, **kwargs): raise NotImplementedError() class InputSource(xmlreader.InputSource): """ TODO: """ def __init__(self, system_id: Optional[str] = None): xmlreader.InputSource.__init__(self, system_id=system_id) self.content_type: Optional[str] = None self.auto_close = False # see Graph.parse(), true if opened by us def close(self) -> None: c = self.getCharacterStream() if c and hasattr(c, "close"): try: c.close() except Exception: pass f = self.getByteStream() if f and hasattr(f, "close"): try: f.close() except Exception: pass class PythonInputSource(InputSource): """ Constructs an RDFLib Parser InputSource from a Python data structure, for example, loaded from JSON with json.load or json.loads: >>> import json >>> as_string = \"\"\"{ ... "@context" : {"ex" : "http://example.com/ns#"}, ... "@graph": [{"@type": "ex:item", "@id": "#example"}] ... }\"\"\" >>> as_python = json.loads(as_string) >>> source = create_input_source(data=as_python) >>> isinstance(source, PythonInputSource) True """ def __init__(self, data: Any, system_id: Optional[str] = None): self.content_type = None self.auto_close = False # see Graph.parse(), true if opened by us self.public_id: Optional[str] = None self.system_id: Optional[str] = system_id self.data = data def getPublicId(self) -> Optional[str]: # noqa: N802 return self.public_id def setPublicId(self, public_id: Optional[str]) -> None: # noqa: N802 self.public_id = public_id def getSystemId(self) -> Optional[str]: # noqa: N802 return self.system_id def setSystemId(self, system_id: Optional[str]) -> None: # noqa: N802 self.system_id = system_id def close(self) -> None: self.data = None class StringInputSource(InputSource): """ Constructs an RDFLib Parser InputSource from a Python String or Bytes """ def __init__( self, value: Union[str, bytes], encoding: str = "utf-8", system_id: Optional[str] = None, ): super(StringInputSource, self).__init__(system_id) stream: Union[BinaryIO, TextIO] if isinstance(value, str): stream = StringIO(value) self.setCharacterStream(stream) self.setEncoding(encoding) b_stream = BytesIOWrapper(value, encoding) self.setByteStream(b_stream) else: stream = BytesIO(value) self.setByteStream(stream) c_stream = TextIOWrapper(stream, encoding) self.setCharacterStream(c_stream) self.setEncoding(c_stream.encoding) headers = { "User-agent": "rdflib-%s (https://rdflib.github.io/; eikeon@eikeon.com)" % __version__ } class URLInputSource(InputSource): """ Constructs an RDFLib Parser InputSource from a URL to read it from the Web. """ links: List[str] @classmethod def getallmatchingheaders(cls, message: "Message", name) -> List[str]: # This is reimplemented here, because the method # getallmatchingheaders from HTTPMessage is broken since Python 3.0 name = name.lower() return [val for key, val in message.items() if key.lower() == name] @classmethod def get_links(cls, response: addinfourl) -> List[str]: linkslines = cls.getallmatchingheaders(response.headers, "Link") retarray: List[str] = [] for linksline in linkslines: links = [linkstr.strip() for linkstr in linksline.split(",")] for link in links: retarray.append(link) return retarray def get_alternates(self, type_: Optional[str] = None) -> List[str]: typestr: Optional[str] = f'type="{type_}"' if type_ else None relstr = 'rel="alternate"' alts = [] for link in self.links: parts = [p.strip() for p in link.split(";")] if relstr not in parts: continue if typestr: if typestr in parts: alts.append(parts[0].strip("<>")) else: alts.append(parts[0].strip("<>")) return alts def __init__(self, system_id: Optional[str] = None, format: Optional[str] = None): super(URLInputSource, self).__init__(system_id) self.url = system_id # copy headers to change myheaders = dict(headers) if format == "xml": myheaders["Accept"] = "application/rdf+xml, */*;q=0.1" elif format == "n3": myheaders["Accept"] = "text/n3, */*;q=0.1" elif format in ["turtle", "ttl"]: myheaders["Accept"] = "text/turtle, application/x-turtle, */*;q=0.1" elif format == "nt": myheaders["Accept"] = "text/plain, */*;q=0.1" elif format == "trig": myheaders["Accept"] = "application/trig, */*;q=0.1" elif format == "trix": myheaders["Accept"] = "application/trix, */*;q=0.1" elif format == "json-ld": myheaders[ "Accept" ] = "application/ld+json, application/json;q=0.9, */*;q=0.1" else: # if format not given, create an Accept header from all registered # parser Media Types from rdflib.parser import Parser from rdflib.plugin import plugins acc = [] for p in plugins(kind=Parser): # only get parsers if "/" in p.name: # all Media Types known have a / in them acc.append(p.name) myheaders["Accept"] = ", ".join(acc) req = Request(system_id, None, myheaders) # type: ignore[arg-type] response: addinfourl = _urlopen(req) self.url = response.geturl() # in case redirections took place self.links = self.get_links(response) if format in ("json-ld", "application/ld+json"): alts = self.get_alternates(type_="application/ld+json") for link in alts: full_link = urljoin(self.url, link) if full_link != self.url and full_link != system_id: response = _urlopen(Request(full_link)) self.url = response.geturl() # in case redirections took place break self.setPublicId(self.url) content_types = self.getallmatchingheaders(response.headers, "content-type") self.content_type = content_types[0] if content_types else None if self.content_type is not None: self.content_type = self.content_type.split(";", 1)[0] self.setByteStream(response) # TODO: self.setEncoding(encoding) self.response_info = response.info() # a mimetools.Message instance def __repr__(self) -> str: # type error: Incompatible return value type (got "Optional[str]", expected "str") return self.url # type: ignore[return-value] class FileInputSource(InputSource): def __init__( self, file: Union[BinaryIO, TextIO, TextIOBase, RawIOBase, BufferedIOBase] ): base = pathlib.Path.cwd().as_uri() system_id = URIRef(pathlib.Path(file.name).absolute().as_uri(), base=base) # type: ignore[union-attr] super(FileInputSource, self).__init__(system_id) self.file = file if isinstance(file, TextIOBase): # Python3 unicode fp self.setCharacterStream(file) self.setEncoding(file.encoding) try: b = file.buffer # type: ignore[attr-defined] self.setByteStream(b) except (AttributeError, LookupError): self.setByteStream(file) else: self.setByteStream(file) # We cannot set characterStream here because # we do not know the Raw Bytes File encoding. def __repr__(self) -> str: return repr(self.file) def create_input_source( source: Optional[ Union[IO[bytes], TextIO, InputSource, str, bytes, pathlib.PurePath] ] = None, publicID: Optional[str] = None, # noqa: N803 location: Optional[str] = None, file: Optional[Union[BinaryIO, TextIO]] = None, data: Optional[Union[str, bytes, dict]] = None, format: Optional[str] = None, ) -> InputSource: """ Return an appropriate InputSource instance for the given parameters. """ # test that exactly one of source, location, file, and data is not None. non_empty_arguments = list( filter( lambda v: v is not None, [source, location, file, data], ) ) if len(non_empty_arguments) != 1: raise ValueError( "exactly one of source, location, file or data must be given", ) input_source = None if source is not None: if TYPE_CHECKING: assert file is None assert data is None assert location is None if isinstance(source, InputSource): input_source = source else: if isinstance(source, str): location = source elif isinstance(source, pathlib.PurePath): location = str(source) elif isinstance(source, bytes): data = source elif hasattr(source, "read") and not isinstance(source, Namespace): f = source input_source = InputSource() if hasattr(source, "encoding"): input_source.setCharacterStream(source) input_source.setEncoding(source.encoding) try: b = source.buffer # type: ignore[union-attr] input_source.setByteStream(b) except (AttributeError, LookupError): input_source.setByteStream(source) else: input_source.setByteStream(f) if f is sys.stdin: input_source.setSystemId("file:///dev/stdin") elif hasattr(f, "name"): input_source.setSystemId(f.name) else: raise Exception( "Unexpected type '%s' for source '%s'" % (type(source), source) ) absolute_location = None # Further to fix for issue 130 auto_close = False # make sure we close all file handles we open if location is not None: if TYPE_CHECKING: assert file is None assert data is None assert source is None ( absolute_location, auto_close, file, input_source, ) = _create_input_source_from_location( file=file, format=format, input_source=input_source, location=location, ) if file is not None: if TYPE_CHECKING: assert location is None assert data is None assert source is None input_source = FileInputSource(file) if data is not None: if TYPE_CHECKING: assert location is None assert file is None assert source is None if isinstance(data, dict): input_source = PythonInputSource(data) auto_close = True elif isinstance(data, (str, bytes, bytearray)): input_source = StringInputSource(data) auto_close = True else: raise RuntimeError(f"parse data can only str, or bytes. not: {type(data)}") if input_source is None: raise Exception("could not create InputSource") else: input_source.auto_close |= auto_close if publicID is not None: # Further to fix for issue 130 input_source.setPublicId(publicID) # Further to fix for issue 130 elif input_source.getPublicId() is None: input_source.setPublicId(absolute_location or "") return input_source def _create_input_source_from_location( file: Optional[Union[BinaryIO, TextIO]], format: Optional[str], input_source: Optional[InputSource], location: str, ) -> Tuple[URIRef, bool, Optional[Union[BinaryIO, TextIO]], Optional[InputSource]]: # Fix for Windows problem https://github.com/RDFLib/rdflib/issues/145 and # https://github.com/RDFLib/rdflib/issues/1430 # NOTE: using pathlib.Path.exists on a URL fails on windows as it is not a # valid path. However os.path.exists() returns false for a URL on windows # which is why it is being used instead. if os.path.exists(location): location = pathlib.Path(location).absolute().as_uri() base = pathlib.Path.cwd().as_uri() absolute_location = URIRef(rdflib.util._iri2uri(location), base=base) if absolute_location.startswith("file:///"): filename = url2pathname(absolute_location.replace("file:///", "/")) file = open(filename, "rb") else: input_source = URLInputSource(absolute_location, format) auto_close = True # publicID = publicID or absolute_location # Further to fix # for issue 130 return absolute_location, auto_close, file, input_source