# Copyright (C) 2015-2025  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
import math
from typing import Any, Dict, Optional, Tuple
import warnings
from swh.core.utils import encode_with_unescape
from swh.model.model import (
    ExtID,
    MetadataAuthority,
    MetadataAuthorityType,
    MetadataFetcher,
    ObjectType,
    Origin,
    Person,
    RawExtrinsicMetadata,
    Release,
    Revision,
    RevisionType,
    Timestamp,
    TimestampWithTimezone,
)
from swh.model.swhids import CoreSWHID
from swh.model.swhids import ExtendedObjectType as SwhidExtendedObjectType
from swh.model.swhids import ExtendedSWHID
from swh.model.swhids import ObjectType as SwhidObjectType
from swh.storage.interface import ObjectReference
from ..utils import map_optional
DEFAULT_AUTHOR = {
    "fullname": None,
    "name": None,
    "email": None,
}
DEFAULT_DATE = {
    "timestamp": None,
    "offset": 0,
    "neg_utc_offset": None,
    "offset_bytes": None,
}
[docs]
def author_to_db(author: Optional[Person]) -> Dict[str, Any]:
    """Convert a swh-model author to its DB representation.
    Args:
        author: a :mod:`swh.model` compatible author
    Returns:
        dict: a dictionary with three keys: author, fullname and email
    """
    if author is None:
        return DEFAULT_AUTHOR
    return author.to_dict() 
[docs]
def db_to_author(
    fullname: Optional[bytes], name: Optional[bytes], email: Optional[bytes]
) -> Optional[Person]:
    """Convert the DB representation of an author to a swh-model author.
    Args:
        fullname (bytes): the author's fullname
        name (bytes): the author's name
        email (bytes): the author's email
    Returns:
        a Person object, or None if 'fullname' is None.
    """
    if fullname is None:
        return None
    if name is None and email is None:
        # The fullname hasn't been parsed, try that again
        return Person.from_fullname(fullname)
    return Person(
        fullname=fullname,
        name=name,
        email=email,
    ) 
[docs]
def db_to_date(
    date: Optional[datetime.datetime],
    offset_bytes: bytes,
) -> Optional[TimestampWithTimezone]:
    """Convert the DB representation of a date to a swh-model compatible date.
    Args:
        date: a date pulled out of the database
        offset_bytes: a byte representation of the latter two, usually as "+HHMM"
          or "-HHMM"
    Returns:
        a TimestampWithTimezone, or None if the date is None.
    """
    if date is None:
        return None
    return TimestampWithTimezone(
        timestamp=Timestamp(
            # we use floor() instead of int() to round down, because of negative dates
            # we also remove microseconds to avoid Python producing a timestamp offset
            # by one for very particular edge case datetimes
            seconds=math.floor(date.replace(microsecond=0).timestamp()),
            microseconds=date.microsecond,
        ),
        offset_bytes=offset_bytes,
    ) 
[docs]
def date_to_db(ts_with_tz: Optional[TimestampWithTimezone]) -> Dict[str, Any]:
    """Convert a swh-model date_offset to its DB representation.
    Args:
        ts_with_tz: a TimestampWithTimezone object
    Returns:
        dict: a dictionary with these keys:
            - timestamp: a date in ISO format
            - offset_bytes: a byte representation of the latter two, usually as "+HHMM"
              or "-HHMM"
    """
    if ts_with_tz is None:
        return DEFAULT_DATE
    ts = ts_with_tz.timestamp
    timestamp = datetime.datetime.fromtimestamp(ts.seconds, datetime.timezone.utc)
    timestamp = timestamp.replace(microsecond=ts.microseconds)
    return {
        # PostgreSQL supports isoformatted timestamps
        "timestamp": timestamp.isoformat(),
        "offset": ts_with_tz.offset_minutes(),
        "neg_utc_offset": ts_with_tz.offset_minutes() == 0
        and ts_with_tz.offset_bytes.startswith(b"-"),
        "offset_bytes": ts_with_tz.offset_bytes,
    } 
[docs]
def revision_to_db(revision: Revision) -> Dict[str, Any]:
    """Convert a swh-model revision to its database representation."""
    author = author_to_db(revision.author)
    date = date_to_db(revision.date)
    committer = author_to_db(revision.committer)
    committer_date = date_to_db(revision.committer_date)
    return {
        "id": revision.id,
        "author_fullname": author["fullname"],
        "author_name": author["name"],
        "author_email": author["email"],
        "date": date["timestamp"],
        "date_offset": date["offset"],
        "date_neg_utc_offset": date["neg_utc_offset"],
        "date_offset_bytes": date["offset_bytes"],
        "committer_fullname": committer["fullname"],
        "committer_name": committer["name"],
        "committer_email": committer["email"],
        "committer_date": committer_date["timestamp"],
        "committer_date_offset": committer_date["offset"],
        "committer_date_neg_utc_offset": committer_date["neg_utc_offset"],
        "committer_date_offset_bytes": committer_date["offset_bytes"],
        "type": revision.type.value,
        "directory": revision.directory,
        "message": revision.message,
        "metadata": None if revision.metadata is None else dict(revision.metadata),
        "synthetic": revision.synthetic,
        "extra_headers": revision.extra_headers,
        "raw_manifest": revision.raw_manifest,
        "parents": [
            {
                "id": revision.id,
                "parent_id": parent,
                "parent_rank": i,
            }
            for i, parent in enumerate(revision.parents)
        ],
    } 
[docs]
def db_to_optional_revision(db_revision: Dict[str, Any]) -> Optional[Revision]:
    """Convert a database representation of a revision to its swh-model
    representation.
    This is similar to :func:`db_to_revision`, but this returns :const:`None`
    instead of crashing if the revision does not exist (returned by an outer join).
    """
    if db_revision["type"] is None:
        assert all(
            v is None for (k, v) in db_revision.items() if k not in ("id", "parents")
        )
        return None
    return db_to_revision(db_revision) 
[docs]
def db_to_revision(db_revision: Dict[str, Any]) -> Revision:
    """Convert a database representation of a revision to its swh-model
    representation. Raises ValueError if required values are None."""
    author = db_to_author(
        db_revision["author_fullname"],
        db_revision["author_name"],
        db_revision["author_email"],
    )
    date = db_to_date(
        db_revision["date"],
        db_revision["date_offset_bytes"],
    )
    committer = db_to_author(
        db_revision["committer_fullname"],
        db_revision["committer_name"],
        db_revision["committer_email"],
    )
    committer_date = db_to_date(
        db_revision["committer_date"],
        db_revision["committer_date_offset_bytes"],
    )
    assert (author is None) == (
        db_revision["author_fullname"] is None
    ), "author is unexpectedly None"
    assert (committer is None) == (
        db_revision["committer_fullname"] is None
    ), "committer is unexpectedly None"
    parents = []
    if "parents" in db_revision:
        for parent in db_revision["parents"]:
            if parent:
                parents.append(parent)
    metadata = db_revision["metadata"]
    extra_headers = db_revision["extra_headers"]
    if not extra_headers:
        if metadata and "extra_headers" in metadata:
            extra_headers = db_to_git_headers(metadata.pop("extra_headers"))
        else:
            # For older versions of the database that were not migrated to schema v161
            extra_headers = ()
    return Revision(
        id=db_revision["id"],
        author=author,
        date=date,
        committer=committer,
        committer_date=committer_date,
        type=RevisionType(db_revision["type"]),
        directory=db_revision["directory"],
        message=db_revision["message"],
        metadata=metadata,
        synthetic=db_revision["synthetic"],
        extra_headers=extra_headers,
        parents=tuple(parents),
        raw_manifest=db_revision["raw_manifest"],
    ) 
[docs]
def release_to_db(release: Release) -> Dict[str, Any]:
    """Convert a swh-model release to its database representation."""
    author = author_to_db(release.author)
    date = date_to_db(release.date)
    return {
        "id": release.id,
        "author_fullname": author["fullname"],
        "author_name": author["name"],
        "author_email": author["email"],
        "date": date["timestamp"],
        "date_offset": date["offset"],
        "date_neg_utc_offset": date["neg_utc_offset"],
        "date_offset_bytes": date["offset_bytes"],
        "name": release.name,
        "target": release.target,
        "target_type": release.target_type.value,
        "comment": release.message,
        "synthetic": release.synthetic,
        "raw_manifest": release.raw_manifest,
    } 
[docs]
def db_to_optional_release(db_release: Dict[str, Any]) -> Optional[Release]:
    """Convert a database representation of a release to its swh-model
    representation.
    This is similar to :func:`db_to_release`, but this returns :const:`None`
    instead of crashing if the revision does not exist (returned by an outer join).
    """
    if db_release["target_type"] is None:
        assert all(v is None for (k, v) in db_release.items() if k != "id")
        return None
    return db_to_release(db_release) 
[docs]
def db_to_release(db_release: Dict[str, Any]) -> Release:
    """Convert a database representation of a release to its swh-model
    representation. Raises ValueError if required values are None.
    """
    author = db_to_author(
        db_release["author_fullname"],
        db_release["author_name"],
        db_release["author_email"],
    )
    date = db_to_date(
        db_release["date"],
        db_release["date_offset_bytes"],
    )
    return Release(
        author=author,
        date=date,
        id=db_release["id"],
        name=db_release["name"],
        message=db_release["comment"],
        synthetic=db_release["synthetic"],
        target=db_release["target"],
        target_type=ObjectType(db_release["target_type"]),
        raw_manifest=db_release["raw_manifest"],
    ) 
[docs]
def db_to_extid(row) -> ExtID:
    return ExtID(
        extid=row["extid"],
        extid_type=row["extid_type"],
        extid_version=row.get("extid_version", 0),
        target=CoreSWHID(
            object_id=row["target"],
            object_type=SwhidObjectType[row["target_type"].upper()],
        ),
    ) 
[docs]
def object_reference_to_db(
    reference: ObjectReference,
) -> Tuple[str, bytes, str, bytes]:
    return (
        reference.source.object_type.name.lower(),
        reference.source.object_id,
        reference.target.object_type.name.lower(),
        reference.target.object_id,
    ) 
[docs]
def db_to_object_reference_source(row) -> ExtendedSWHID:
    return ExtendedSWHID(
        object_id=row["source"],
        object_type=SwhidExtendedObjectType[row["source_type"].upper()],
    )