# SPDX-License-Identifier: Apache-2.0
#
# The OpenSearch Contributors require contributions made to
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.
#
# Modifications Copyright OpenSearch Contributors. See
# GitHub history for details.
#
#  Licensed to Elasticsearch B.V. under one or more contributor
#  license agreements. See the NOTICE file distributed with
#  this work for additional information regarding copyright
#  ownership. Elasticsearch B.V. licenses this file to you under
#  the Apache License, Version 2.0 (the "License"); you may
#  not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
# 	http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing,
#  software distributed under the License is distributed on an
#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
#  KIND, either express or implied.  See the License for the
#  specific language governing permissions and limitations
#  under the License.

import collections.abc as collections_abc
from itertools import chain
from typing import Any

from six import iteritems, itervalues

from opensearchpy.connection.connections import get_connection
from opensearchpy.helpers.field import Nested, Text, construct_field

from .utils import DslBase

META_FIELDS = frozenset(
    (
        "dynamic",
        "transform",
        "dynamic_date_formats",
        "date_detection",
        "numeric_detection",
        "dynamic_templates",
        "enabled",
    )
)


class Properties(DslBase):
    name = "properties"
    _param_defs = {"properties": {"type": "field", "hash": True}}

    def __init__(self) -> None:
        super(Properties, self).__init__()

    def __repr__(self) -> str:
        return "Properties()"

    def __getitem__(self, name: Any) -> Any:
        return self.properties[name]

    def __contains__(self, name: Any) -> bool:
        return name in self.properties

    def to_dict(self) -> Any:
        return super(Properties, self).to_dict()["properties"]

    def field(self, name: Any, *args: Any, **kwargs: Any) -> "Properties":
        self.properties[name] = construct_field(*args, **kwargs)
        return self

    def _collect_fields(self) -> Any:
        """Iterate over all Field objects within, including multi fields."""
        for f in itervalues(self.properties.to_dict()):
            yield f
            # multi fields
            if hasattr(f, "fields"):
                for inner_f in itervalues(f.fields.to_dict()):
                    yield inner_f
            # nested and inner objects
            if hasattr(f, "_collect_fields"):
                for inner_f in f._collect_fields():
                    yield inner_f

    def update(self, other_object: Any) -> None:
        if not hasattr(other_object, "properties"):
            # not an inner/nested object, no merge possible
            return

        our, other = self.properties, other_object.properties
        for name in other:
            if name in our:
                if hasattr(our[name], "update"):
                    our[name].update(other[name])
                continue
            our[name] = other[name]


class Mapping(object):
    def __init__(self) -> None:
        self.properties = Properties()
        self._meta: Any = {}

    def __repr__(self) -> str:
        return "Mapping()"

    def _clone(self) -> Any:
        m = Mapping()
        m.properties._params = self.properties._params.copy()
        return m

    @classmethod
    def from_opensearch(cls, index: Any, using: str = "default") -> Any:
        m = cls()
        m.update_from_opensearch(index, using)
        return m

    def resolve_nested(self, field_path: Any) -> Any:
        field = self
        nested = []
        parts = field_path.split(".")
        for i, step in enumerate(parts):
            try:
                field = field[step]
            except KeyError:
                return (), None
            if isinstance(field, Nested):
                nested.append(".".join(parts[: i + 1]))
        return nested, field

    def resolve_field(self, field_path: Any) -> Any:
        field = self
        for step in field_path.split("."):
            try:
                field = field[step]
            except KeyError:
                return None
        return field

    def _collect_analysis(self) -> Any:
        analysis: Any = {}
        fields: Any = []
        if "_all" in self._meta:
            fields.append(Text(**self._meta["_all"]))

        for f in chain(fields, self.properties._collect_fields()):
            for analyzer_name in (
                "analyzer",
                "normalizer",
                "search_analyzer",
                "search_quote_analyzer",
            ):
                if not hasattr(f, analyzer_name):
                    continue
                analyzer = getattr(f, analyzer_name)
                d = analyzer.get_analysis_definition()
                # empty custom analyzer, probably already defined out of our control
                if not d:
                    continue

                # merge the definition
                # TODO: conflict detection/resolution
                for key in d:
                    analysis.setdefault(key, {}).update(d[key])

        return analysis

    def save(self, index: Any, using: str = "default") -> Any:
        from opensearchpy.helpers.index import Index

        index = Index(index, using=using)
        index.mapping(self)
        return index.save()

    def update_from_opensearch(self, index: Any, using: str = "default") -> None:
        opensearch = get_connection(using)
        raw = opensearch.indices.get_mapping(index=index)
        _, raw = raw.popitem()
        self._update_from_dict(raw["mappings"])

    def _update_from_dict(self, raw: Any) -> None:
        for name, definition in iteritems(raw.get("properties", {})):
            self.field(name, definition)

        # metadata like _all etc
        for name, value in iteritems(raw):
            if name != "properties":
                if isinstance(value, collections_abc.Mapping):
                    self.meta(name, **value)
                else:
                    self.meta(name, value)

    def update(self, mapping: Any, update_only: bool = False) -> None:
        for name in mapping:
            if update_only and name in self:
                # nested and inner objects, merge recursively
                if hasattr(self[name], "update"):
                    # FIXME only merge subfields, not the settings
                    self[name].update(mapping[name], update_only)
                continue
            self.field(name, mapping[name])

        if update_only:
            for name in mapping._meta:
                if name not in self._meta:
                    self._meta[name] = mapping._meta[name]
        else:
            self._meta.update(mapping._meta)

    def __contains__(self, name: Any) -> Any:
        return name in self.properties.properties

    def __getitem__(self, name: Any) -> Any:
        return self.properties.properties[name]

    def __iter__(self) -> Any:
        return iter(self.properties.properties)

    def field(self, *args: Any, **kwargs: Any) -> "Mapping":
        self.properties.field(*args, **kwargs)
        return self

    def meta(self, name: Any, params: Any = None, **kwargs: Any) -> "Mapping":
        if not name.startswith("_") and name not in META_FIELDS:
            name = "_" + name

        if params and kwargs:
            raise ValueError("Meta configs cannot have both value and a dictionary.")

        self._meta[name] = kwargs if params is None else params
        return self

    def to_dict(self) -> Any:
        meta = self._meta

        # hard coded serialization of analyzers in _all
        if "_all" in meta:
            meta = meta.copy()
            _all = meta["_all"] = meta["_all"].copy()
            for f in ("analyzer", "search_analyzer", "search_quote_analyzer"):
                if hasattr(_all.get(f, None), "to_dict"):
                    _all[f] = _all[f].to_dict()
        meta.update(self.properties.to_dict())
        return meta