"""
Referencing implementations for JSON Schema specs (historic & current).
"""

from __future__ import annotations

from collections.abc import Sequence, Set
from typing import Any, Iterable, Union

from referencing import Anchor, Registry, Resource, Specification, exceptions
from referencing._attrs import frozen
from referencing._core import (
    _UNSET,  # type: ignore[reportPrivateUsage]
    Resolved as _Resolved,
    Resolver as _Resolver,
    _Unset,  # type: ignore[reportPrivateUsage]
)
from referencing.typing import URI, Anchor as AnchorType, Mapping

#: A JSON Schema which is a JSON object
ObjectSchema = Mapping[str, Any]

#: A JSON Schema of any kind
Schema = Union[bool, ObjectSchema]

#: A Resource whose contents are JSON Schemas
SchemaResource = Resource[Schema]

#: A JSON Schema Registry
SchemaRegistry = Registry[Schema]

#: The empty JSON Schema Registry
EMPTY_REGISTRY: SchemaRegistry = Registry()


@frozen
class UnknownDialect(Exception):
    """
    A dialect identifier was found for a dialect unknown by this library.

    If it's a custom ("unofficial") dialect, be sure you've registered it.
    """

    uri: URI


def _dollar_id(contents: Schema) -> URI | None:
    if isinstance(contents, bool):
        return
    return contents.get("$id")


def _legacy_dollar_id(contents: Schema) -> URI | None:
    if isinstance(contents, bool) or "$ref" in contents:
        return
    id = contents.get("$id")
    if id is not None and not id.startswith("#"):
        return id


def _legacy_id(contents: ObjectSchema) -> URI | None:
    if "$ref" in contents:
        return
    id = contents.get("id")
    if id is not None and not id.startswith("#"):
        return id


def _anchor(
    specification: Specification[Schema],
    contents: Schema,
) -> Iterable[AnchorType[Schema]]:
    if isinstance(contents, bool):
        return
    anchor = contents.get("$anchor")
    if anchor is not None:
        yield Anchor(
            name=anchor,
            resource=specification.create_resource(contents),
        )

    dynamic_anchor = contents.get("$dynamicAnchor")
    if dynamic_anchor is not None:
        yield DynamicAnchor(
            name=dynamic_anchor,
            resource=specification.create_resource(contents),
        )


def _anchor_2019(
    specification: Specification[Schema],
    contents: Schema,
) -> Iterable[Anchor[Schema]]:
    if isinstance(contents, bool):
        return []
    anchor = contents.get("$anchor")
    if anchor is None:
        return []
    return [
        Anchor(
            name=anchor,
            resource=specification.create_resource(contents),
        ),
    ]


def _legacy_anchor_in_dollar_id(
    specification: Specification[Schema],
    contents: Schema,
) -> Iterable[Anchor[Schema]]:
    if isinstance(contents, bool):
        return []
    id = contents.get("$id", "")
    if not id.startswith("#"):
        return []
    return [
        Anchor(
            name=id[1:],
            resource=specification.create_resource(contents),
        ),
    ]


def _legacy_anchor_in_id(
    specification: Specification[ObjectSchema],
    contents: ObjectSchema,
) -> Iterable[Anchor[ObjectSchema]]:
    id = contents.get("id", "")
    if not id.startswith("#"):
        return []
    return [
        Anchor(
            name=id[1:],
            resource=specification.create_resource(contents),
        ),
    ]


def _subresources_of(
    in_value: Set[str] = frozenset(),
    in_subvalues: Set[str] = frozenset(),
    in_subarray: Set[str] = frozenset(),
):
    """
    Create a callable returning JSON Schema specification-style subschemas.

    Relies on specifying the set of keywords containing subschemas in their
    values, in a subobject's values, or in a subarray.
    """

    def subresources_of(contents: Schema) -> Iterable[ObjectSchema]:
        if isinstance(contents, bool):
            return
        for each in in_value:
            if each in contents:
                yield contents[each]
        for each in in_subarray:
            if each in contents:
                yield from contents[each]
        for each in in_subvalues:
            if each in contents:
                yield from contents[each].values()

    return subresources_of


def _subresources_of_with_crazy_items(
    in_value: Set[str] = frozenset(),
    in_subvalues: Set[str] = frozenset(),
    in_subarray: Set[str] = frozenset(),
):
    """
    Specifically handle older drafts where there are some funky keywords.
    """

    def subresources_of(contents: Schema) -> Iterable[ObjectSchema]:
        if isinstance(contents, bool):
            return
        for each in in_value:
            if each in contents:
                yield contents[each]
        for each in in_subarray:
            if each in contents:
                yield from contents[each]
        for each in in_subvalues:
            if each in contents:
                yield from contents[each].values()

        items = contents.get("items")
        if items is not None:
            if isinstance(items, Sequence):
                yield from items
            else:
                yield items

    return subresources_of


def _subresources_of_with_crazy_items_dependencies(
    in_value: Set[str] = frozenset(),
    in_subvalues: Set[str] = frozenset(),
    in_subarray: Set[str] = frozenset(),
):
    """
    Specifically handle older drafts where there are some funky keywords.
    """

    def subresources_of(contents: Schema) -> Iterable[ObjectSchema]:
        if isinstance(contents, bool):
            return
        for each in in_value:
            if each in contents:
                yield contents[each]
        for each in in_subarray:
            if each in contents:
                yield from contents[each]
        for each in in_subvalues:
            if each in contents:
                yield from contents[each].values()

        items = contents.get("items")
        if items is not None:
            if isinstance(items, Sequence):
                yield from items
            else:
                yield items
        dependencies = contents.get("dependencies")
        if dependencies is not None:
            values = iter(dependencies.values())
            value = next(values, None)
            if isinstance(value, Mapping):
                yield value
                yield from values

    return subresources_of


def _subresources_of_with_crazy_aP_items_dependencies(
    in_value: Set[str] = frozenset(),
    in_subvalues: Set[str] = frozenset(),
    in_subarray: Set[str] = frozenset(),
):
    """
    Specifically handle even older drafts where there are some funky keywords.
    """

    def subresources_of(contents: ObjectSchema) -> Iterable[ObjectSchema]:
        for each in in_value:
            if each in contents:
                yield contents[each]
        for each in in_subarray:
            if each in contents:
                yield from contents[each]
        for each in in_subvalues:
            if each in contents:
                yield from contents[each].values()

        items = contents.get("items")
        if items is not None:
            if isinstance(items, Sequence):
                yield from items
            else:
                yield items
        dependencies = contents.get("dependencies")
        if dependencies is not None:
            values = iter(dependencies.values())
            value = next(values, None)
            if isinstance(value, Mapping):
                yield value
                yield from values

        for each in "additionalItems", "additionalProperties":
            value = contents.get(each)
            if isinstance(value, Mapping):
                yield value

    return subresources_of


def _maybe_in_subresource(
    in_value: Set[str] = frozenset(),
    in_subvalues: Set[str] = frozenset(),
    in_subarray: Set[str] = frozenset(),
):
    in_child = in_subvalues | in_subarray

    def maybe_in_subresource(
        segments: Sequence[int | str],
        resolver: _Resolver[Any],
        subresource: Resource[Any],
    ) -> _Resolver[Any]:
        _segments = iter(segments)
        for segment in _segments:
            if segment not in in_value and (
                segment not in in_child or next(_segments, None) is None
            ):
                return resolver
        return resolver.in_subresource(subresource)

    return maybe_in_subresource


def _maybe_in_subresource_crazy_items(
    in_value: Set[str] = frozenset(),
    in_subvalues: Set[str] = frozenset(),
    in_subarray: Set[str] = frozenset(),
):
    in_child = in_subvalues | in_subarray

    def maybe_in_subresource(
        segments: Sequence[int | str],
        resolver: _Resolver[Any],
        subresource: Resource[Any],
    ) -> _Resolver[Any]:
        _segments = iter(segments)
        for segment in _segments:
            if segment == "items" and isinstance(
                subresource.contents,
                Mapping,
            ):
                return resolver.in_subresource(subresource)
            if segment not in in_value and (
                segment not in in_child or next(_segments, None) is None
            ):
                return resolver
        return resolver.in_subresource(subresource)

    return maybe_in_subresource


def _maybe_in_subresource_crazy_items_dependencies(
    in_value: Set[str] = frozenset(),
    in_subvalues: Set[str] = frozenset(),
    in_subarray: Set[str] = frozenset(),
):
    in_child = in_subvalues | in_subarray

    def maybe_in_subresource(
        segments: Sequence[int | str],
        resolver: _Resolver[Any],
        subresource: Resource[Any],
    ) -> _Resolver[Any]:
        _segments = iter(segments)
        for segment in _segments:
            if segment in {"items", "dependencies"} and isinstance(
                subresource.contents,
                Mapping,
            ):
                return resolver.in_subresource(subresource)
            if segment not in in_value and (
                segment not in in_child or next(_segments, None) is None
            ):
                return resolver
        return resolver.in_subresource(subresource)

    return maybe_in_subresource


#: JSON Schema draft 2020-12
DRAFT202012 = Specification(
    name="draft2020-12",
    id_of=_dollar_id,
    subresources_of=_subresources_of(
        in_value={
            "additionalProperties",
            "contains",
            "contentSchema",
            "else",
            "if",
            "items",
            "not",
            "propertyNames",
            "then",
            "unevaluatedItems",
            "unevaluatedProperties",
        },
        in_subarray={"allOf", "anyOf", "oneOf", "prefixItems"},
        in_subvalues={
            "$defs",
            "definitions",
            "dependentSchemas",
            "patternProperties",
            "properties",
        },
    ),
    anchors_in=_anchor,
    maybe_in_subresource=_maybe_in_subresource(
        in_value={
            "additionalProperties",
            "contains",
            "contentSchema",
            "else",
            "if",
            "items",
            "not",
            "propertyNames",
            "then",
            "unevaluatedItems",
            "unevaluatedProperties",
        },
        in_subarray={"allOf", "anyOf", "oneOf", "prefixItems"},
        in_subvalues={
            "$defs",
            "definitions",
            "dependentSchemas",
            "patternProperties",
            "properties",
        },
    ),
)
#: JSON Schema draft 2019-09
DRAFT201909 = Specification(
    name="draft2019-09",
    id_of=_dollar_id,
    subresources_of=_subresources_of_with_crazy_items(
        in_value={
            "additionalItems",
            "additionalProperties",
            "contains",
            "contentSchema",
            "else",
            "if",
            "not",
            "propertyNames",
            "then",
            "unevaluatedItems",
            "unevaluatedProperties",
        },
        in_subarray={"allOf", "anyOf", "oneOf"},
        in_subvalues={
            "$defs",
            "definitions",
            "dependentSchemas",
            "patternProperties",
            "properties",
        },
    ),
    anchors_in=_anchor_2019,  # type: ignore[reportGeneralTypeIssues]  # TODO: check whether this is real
    maybe_in_subresource=_maybe_in_subresource_crazy_items(
        in_value={
            "additionalItems",
            "additionalProperties",
            "contains",
            "contentSchema",
            "else",
            "if",
            "not",
            "propertyNames",
            "then",
            "unevaluatedItems",
            "unevaluatedProperties",
        },
        in_subarray={"allOf", "anyOf", "oneOf"},
        in_subvalues={
            "$defs",
            "definitions",
            "dependentSchemas",
            "patternProperties",
            "properties",
        },
    ),
)
#: JSON Schema draft 7
DRAFT7 = Specification(
    name="draft-07",
    id_of=_legacy_dollar_id,
    subresources_of=_subresources_of_with_crazy_items_dependencies(
        in_value={
            "additionalItems",
            "additionalProperties",
            "contains",
            "else",
            "if",
            "not",
            "propertyNames",
            "then",
        },
        in_subarray={"allOf", "anyOf", "oneOf"},
        in_subvalues={"definitions", "patternProperties", "properties"},
    ),
    anchors_in=_legacy_anchor_in_dollar_id,  # type: ignore[reportGeneralTypeIssues]  # TODO: check whether this is real
    maybe_in_subresource=_maybe_in_subresource_crazy_items_dependencies(
        in_value={
            "additionalItems",
            "additionalProperties",
            "contains",
            "else",
            "if",
            "not",
            "propertyNames",
            "then",
        },
        in_subarray={"allOf", "anyOf", "oneOf"},
        in_subvalues={"definitions", "patternProperties", "properties"},
    ),
)
#: JSON Schema draft 6
DRAFT6 = Specification(
    name="draft-06",
    id_of=_legacy_dollar_id,
    subresources_of=_subresources_of_with_crazy_items_dependencies(
        in_value={
            "additionalItems",
            "additionalProperties",
            "contains",
            "not",
            "propertyNames",
        },
        in_subarray={"allOf", "anyOf", "oneOf"},
        in_subvalues={"definitions", "patternProperties", "properties"},
    ),
    anchors_in=_legacy_anchor_in_dollar_id,  # type: ignore[reportGeneralTypeIssues]  # TODO: check whether this is real
    maybe_in_subresource=_maybe_in_subresource_crazy_items_dependencies(
        in_value={
            "additionalItems",
            "additionalProperties",
            "contains",
            "not",
            "propertyNames",
        },
        in_subarray={"allOf", "anyOf", "oneOf"},
        in_subvalues={"definitions", "patternProperties", "properties"},
    ),
)
#: JSON Schema draft 4
DRAFT4 = Specification(
    name="draft-04",
    id_of=_legacy_id,
    subresources_of=_subresources_of_with_crazy_aP_items_dependencies(
        in_value={"not"},
        in_subarray={"allOf", "anyOf", "oneOf"},
        in_subvalues={"definitions", "patternProperties", "properties"},
    ),
    anchors_in=_legacy_anchor_in_id,
    maybe_in_subresource=_maybe_in_subresource_crazy_items_dependencies(
        in_value={"additionalItems", "additionalProperties", "not"},
        in_subarray={"allOf", "anyOf", "oneOf"},
        in_subvalues={"definitions", "patternProperties", "properties"},
    ),
)
#: JSON Schema draft 3
DRAFT3 = Specification(
    name="draft-03",
    id_of=_legacy_id,
    subresources_of=_subresources_of_with_crazy_aP_items_dependencies(
        in_subarray={"extends"},
        in_subvalues={"definitions", "patternProperties", "properties"},
    ),
    anchors_in=_legacy_anchor_in_id,
    maybe_in_subresource=_maybe_in_subresource_crazy_items_dependencies(
        in_value={"additionalItems", "additionalProperties"},
        in_subarray={"extends"},
        in_subvalues={"definitions", "patternProperties", "properties"},
    ),
)


_SPECIFICATIONS: Registry[Specification[Schema]] = Registry(
    {  # type: ignore[reportGeneralTypeIssues]  # :/ internal vs external types
        dialect_id: Resource.opaque(specification)
        for dialect_id, specification in [
            ("https://json-schema.org/draft/2020-12/schema", DRAFT202012),
            ("https://json-schema.org/draft/2019-09/schema", DRAFT201909),
            ("http://json-schema.org/draft-07/schema", DRAFT7),
            ("http://json-schema.org/draft-06/schema", DRAFT6),
            ("http://json-schema.org/draft-04/schema", DRAFT4),
            ("http://json-schema.org/draft-03/schema", DRAFT3),
        ]
    },
)


def specification_with(
    dialect_id: URI,
    default: Specification[Any] | _Unset = _UNSET,
) -> Specification[Any]:
    """
    Retrieve the `Specification` with the given dialect identifier.

    Raises:

        `UnknownDialect`

            if the given ``dialect_id`` isn't known

    """
    resource = _SPECIFICATIONS.get(dialect_id.rstrip("#"))
    if resource is not None:
        return resource.contents
    if default is _UNSET:
        raise UnknownDialect(dialect_id)
    return default


@frozen
class DynamicAnchor:
    """
    Dynamic anchors, introduced in draft 2020.
    """

    name: str
    resource: SchemaResource

    def resolve(self, resolver: _Resolver[Schema]) -> _Resolved[Schema]:
        """
        Resolve this anchor dynamically.
        """
        last = self.resource
        for uri, registry in resolver.dynamic_scope():
            try:
                anchor = registry.anchor(uri, self.name).value
            except exceptions.NoSuchAnchor:
                continue
            if isinstance(anchor, DynamicAnchor):
                last = anchor.resource
        return _Resolved(
            contents=last.contents,
            resolver=resolver.in_subresource(last),
        )


def lookup_recursive_ref(resolver: _Resolver[Schema]) -> _Resolved[Schema]:
    """
    Recursive references (via recursive anchors), present only in draft 2019.

    As per the 2019 specification (ยง 8.2.4.2.1), only the ``#`` recursive
    reference is supported (and is therefore assumed to be the relevant
    reference).
    """
    resolved = resolver.lookup("#")
    if isinstance(resolved.contents, Mapping) and resolved.contents.get(
        "$recursiveAnchor",
    ):
        for uri, _ in resolver.dynamic_scope():
            next_resolved = resolver.lookup(uri)
            if not isinstance(
                next_resolved.contents,
                Mapping,
            ) or not next_resolved.contents.get("$recursiveAnchor"):
                break
            resolved = next_resolved
    return resolved