# Copyright (C) 2017-2024  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import json
import logging
from typing import (
    Any,
    Callable,
    Dict,
    List,
    Optional,
    Pattern,
    Tuple,
    TypeVar,
    Union,
    cast,
)
import uuid
import xml.parsers.expat
from pyld import jsonld
import rdflib
from typing_extensions import TypedDict
import xmltodict
import yaml
from swh.indexer.codemeta import _document_loader, compact
from swh.indexer.namespaces import RDF, SCHEMA
from swh.indexer.storage.interface import Sha1
from swh.objstorage.interface import CompositeObjId, objid_from_dict
from .utils import add_url_if_valid
TMP_ROOT_URI_PREFIX = "https://www.softwareheritage.org/schema/2022/indexer/tmp-node/"
"""Prefix used to generate temporary URIs for root nodes being translated."""
[docs]
class DirectoryLsEntry(TypedDict):
    target: Sha1
    sha1: Optional[Sha1]
    sha1_git: Optional[bytes]
    sha256: Optional[bytes]
    name: bytes
    type: str 
TTranslateCallable = TypeVar(
    "TTranslateCallable",
    bound=Callable[[Any, rdflib.Graph, rdflib.term.BNode, Any], None],
)
[docs]
def produce_terms(*uris: str) -> Callable[[TTranslateCallable], TTranslateCallable]:
    """Returns a decorator that marks the decorated function as adding
    the given terms to the ``translated_metadata`` dict"""
    def decorator(f: TTranslateCallable) -> TTranslateCallable:
        if not hasattr(f, "produced_terms"):
            f.produced_terms = []  # type: ignore
        f.produced_terms.extend(uris)  # type: ignore
        return f
    return decorator 
[docs]
class BaseMapping:
    """Base class for :class:`BaseExtrinsicMapping` and :class:`BaseIntrinsicMapping`,
    not to be inherited directly."""
    def __init__(self, log_suffix=""):
        self.log_suffix = log_suffix
        self.log = logging.getLogger(
            "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
        )
    @property
    def name(self):
        """A name of this mapping, used as an identifier in the
        indexer storage."""
        raise NotImplementedError(f"{self.__class__.__name__}.name")
[docs]
    def translate(self, raw_content: bytes) -> Optional[Dict]:
        """
        Translates content by parsing content from a bytestring containing
        mapping-specific data and translating with the appropriate mapping
        to JSON-LD using the Codemeta and ForgeFed vocabularies.
        Args:
            raw_content: raw content to translate
        Returns:
            translated metadata in JSON friendly form needed for the content
            if parseable, :const:`None` otherwise.
        """
        raise NotImplementedError(f"{self.__class__.__name__}.translate") 
[docs]
    def normalize_translation(self, metadata: Dict[str, Any]) -> Dict[str, Any]:
        raise NotImplementedError(f"{self.__class__.__name__}.normalize_translation") 
 
[docs]
class BaseExtrinsicMapping(BaseMapping):
    """Base class for extrinsic_metadata mappings to inherit from
    To implement a new mapping:
    - inherit this class
    - override translate function
    """
[docs]
    def normalize_translation(self, metadata: Dict[str, Any]) -> Dict[str, Any]:
        return compact(metadata, forgefed=True) 
 
[docs]
class BaseIntrinsicMapping(BaseMapping):
    """Base class for intrinsic-metadata mappings to inherit from
    To implement a new mapping:
    - inherit this class
    - override translate function
    """
[docs]
    def normalize_translation(self, metadata: Dict[str, Any]) -> Dict[str, Any]:
        return compact(metadata, forgefed=False) 
 
[docs]
class SingleFileIntrinsicMapping(BaseIntrinsicMapping):
    """Base class for all intrinsic metadata mappings that use a single file as input."""
    filename: Union[bytes, Pattern[bytes]]
 
[docs]
class DictMapping(BaseMapping):
    """Base class for mappings that take as input a file that is mostly
    a key-value store (eg. a shallow JSON dict)."""
    string_fields: List[str] = []
    """List of fields that are simple strings, and don't need any
    normalization."""
    date_fields: List[str] = []
    """List of fields that are strings that should be typed as http://schema.org/Date
    """
    uri_fields: List[str] = []
    """List of fields that are simple URIs, and don't need any
    normalization."""
    @property
    def mapping(self):
        """A translation dict to map dict keys into a canonical name."""
        raise NotImplementedError(f"{self.__class__.__name__}.mapping")
    @staticmethod
    def _normalize_method_name(name: str) -> str:
        return name.replace("-", "_")
[docs]
    @classmethod
    def supported_terms(cls):
        # one-to-one mapping from the original key to a CodeMeta term
        simple_terms = {
            str(term)
            for (key, term) in cls.mapping.items()
            if key in cls.string_fields + cls.date_fields + cls.uri_fields
            or hasattr(cls, "normalize_" + cls._normalize_method_name(key))
        }
        # more complex mapping from the original key to JSON-LD
        complex_terms = {
            str(term)
            for meth_name in dir(cls)
            if meth_name.startswith("translate_")
            for term in getattr(getattr(cls, meth_name), "produced_terms", [])
        }
        return simple_terms | complex_terms 
[docs]
    def get_root_uri(self, content_dict: Dict) -> rdflib.URIRef:
        """Returns an URI for the SoftwareSourceCode or Repository being described.
        The default implementation uses a temporary URI that is stripped before
        normalization by :meth:`_translate_dict`.
        """
        # The main object being described (the SoftwareSourceCode) does not necessarily
        # may or may not have an id.
        # If it does, it will need to be set by a subclass.
        # If it doesn't we temporarily use this URI to identify it. Unfortunately,
        # we cannot use a blank node as we need to use it for JSON-LD framing later,
        # and blank nodes cannot be used for framing in JSON-LD >= 1.1
        root_id = TMP_ROOT_URI_PREFIX + str(uuid.uuid4())
        return rdflib.URIRef(root_id) 
    def _translate_dict(self, content_dict: Dict) -> Dict[str, Any]:
        """
        Translates content  by parsing content from a dict object
        and translating with the appropriate mapping
        Args:
            content_dict (dict): content dict to translate
        Returns:
            dict: translated metadata in json-friendly form needed for
            the indexer
        """
        graph = rdflib.Graph()
        root = self.get_root_uri(content_dict)
        self._translate_to_graph(graph, root, content_dict)
        self.sanitize(graph)
        # Convert from rdflib's internal graph representation to JSON
        s = graph.serialize(format="application/ld+json")
        # Load from JSON to a list of Python objects
        jsonld_graph = json.loads(s)
        # Use JSON-LD framing to turn the graph into a rooted tree
        # frame = {"@type": str(SCHEMA.SoftwareSourceCode)}
        translated_metadata = jsonld.frame(
            jsonld_graph,
            {"@id": str(root)},
            options={
                "documentLoader": _document_loader,
                "processingMode": "json-ld-1.1",
            },
        )
        # Remove the temporary id we added at the beginning
        assert isinstance(translated_metadata["@id"], str)
        if translated_metadata["@id"].startswith(TMP_ROOT_URI_PREFIX):
            del translated_metadata["@id"]
        return self.normalize_translation(translated_metadata)
    def _translate_to_graph(
        self, graph: rdflib.Graph, root: rdflib.term.Identifier, content_dict: Dict
    ) -> None:
        """
        Translates content  by parsing content from a dict object
        and translating with the appropriate mapping to the graph passed as parameter
        Args:
            content_dict (dict): content dict to translate
        """
        graph.add((root, RDF.type, SCHEMA.SoftwareSourceCode))
        for k, v in content_dict.items():
            # First, check if there is a specific translation
            # method for this key
            translation_method = getattr(
                self, "translate_" + self._normalize_method_name(k), None
            )
            if translation_method:
                translation_method(graph, root, v)
            elif k in self.mapping:
                # if there is no method, but the key is known from the
                # crosswalk table
                codemeta_key = self.mapping[k]
                # if there is a normalization method, use it on the value,
                # and add its results to the triples
                normalization_method = getattr(
                    self, "normalize_" + self._normalize_method_name(k), None
                )
                if normalization_method:
                    v = normalization_method(v)
                    if v is None:
                        pass
                    elif isinstance(v, list):
                        for item in reversed(v):
                            if isinstance(item, rdflib.URIRef):
                                add_url_if_valid(graph, root, codemeta_key, str(item))
                            else:
                                graph.add((root, codemeta_key, item))
                    else:
                        if isinstance(v, rdflib.URIRef):
                            add_url_if_valid(graph, root, codemeta_key, str(v))
                        else:
                            graph.add((root, codemeta_key, v))
                elif k in self.string_fields and isinstance(v, str):
                    graph.add((root, codemeta_key, rdflib.Literal(v)))
                elif k in self.string_fields and isinstance(v, list):
                    for item in v:
                        graph.add((root, codemeta_key, rdflib.Literal(item)))
                elif k in self.date_fields and isinstance(v, str):
                    typed_v = rdflib.Literal(v, datatype=SCHEMA.Date)
                    graph.add((root, codemeta_key, typed_v))
                elif k in self.date_fields and isinstance(v, list):
                    for item in v:
                        if isinstance(item, str):
                            typed_item = rdflib.Literal(item, datatype=SCHEMA.Date)
                            graph.add((root, codemeta_key, typed_item))
                elif k in self.uri_fields and isinstance(v, str):
                    add_url_if_valid(graph, root, codemeta_key, v)
                elif k in self.uri_fields and isinstance(v, list):
                    for item in v:
                        add_url_if_valid(graph, root, codemeta_key, item)
                else:
                    continue
        self.extra_translation(graph, root, content_dict)
[docs]
    def sanitize(self, graph: rdflib.Graph) -> None:
        # Remove triples that make PyLD crash
        for subject, predicate, _ in graph.triples((None, None, rdflib.URIRef(""))):
            graph.remove((subject, predicate, rdflib.URIRef("")))
        # Should not happen, but we's better check as this may lead to incorrect data
        invalid = False
        for triple in graph.triples((rdflib.URIRef(""), None, None)):
            invalid = True
            logging.error("Empty triple subject URI: %r", triple)
        if invalid:
            raise ValueError("Empty triple subject(s)") 
 
[docs]
class JsonMapping(DictMapping):
    """Base class for all mappings that use JSON data as input."""
[docs]
    def translate(self, raw_content: bytes) -> Optional[Dict]:
        try:
            raw_content_string: str = raw_content.decode()
        except UnicodeDecodeError:
            self.log.warning("Error unidecoding from %s", self.log_suffix)
            return None
        try:
            content_dict = json.loads(raw_content_string)
        except json.JSONDecodeError:
            self.log.warning("Error unjsoning from %s", self.log_suffix)
            return None
        if isinstance(content_dict, dict):
            return self._translate_dict(content_dict)
        return None 
 
[docs]
class XmlMapping(DictMapping):
    """Base class for all mappings that use XML data as input."""
[docs]
    def translate(self, raw_content: bytes) -> Optional[Dict]:
        try:
            d = xmltodict.parse(raw_content)
        except xml.parsers.expat.ExpatError:
            self.log.warning("Error parsing XML from %s", self.log_suffix)
            return None
        except UnicodeDecodeError:
            self.log.warning("Error unidecoding XML from %s", self.log_suffix)
            return None
        except (LookupError, ValueError):
            # unknown encoding or multi-byte encoding
            self.log.warning("Error detecting XML encoding from %s", self.log_suffix)
            return None
        if not isinstance(d, dict):
            self.log.warning("Skipping ill-formed XML content: %s", raw_content)
            return None
        return self._translate_dict(d) 
 
[docs]
class SafeLoader(yaml.SafeLoader):
    yaml_implicit_resolvers = {
        k: [r for r in v if r[0] != "tag:yaml.org,2002:timestamp"]
        for k, v in yaml.SafeLoader.yaml_implicit_resolvers.items()
    } 
[docs]
class YamlMapping(DictMapping):
    """Base class for all mappings that use Yaml data as input."""
[docs]
    def translate(self, raw_content: bytes) -> Optional[Dict[str, str]]:
        raw_content_string: str = raw_content.decode()
        try:
            content_dict = yaml.load(raw_content_string, Loader=SafeLoader)
        except yaml.YAMLError:
            return None
        if isinstance(content_dict, dict):
            return self._translate_dict(content_dict)
        return None